Skip to content

Commit

Permalink
fix: Add partitioning to realtime.messages (#1186)
Browse files Browse the repository at this point in the history
Adds logic to partition the realtime.messages table. The current way works as follows:
* new table is created called realtime.messages_new that is partitioned by `inserted_at`
* migrate existing policies from realtime.messages to realtime.messages_new
* drop existing realtime.messages table
* rename realtime.messages_new to realtime.messages
* Assign the existing ID sequence to the new table
* Provide all required Grants
* On tenant connect, create dynamically 3 partitions: one for yesterday, tomorrow and today

Other importante notes:
* Table partition creation can also happen on function `realtime.send` which is used by other functions
* Table cleanup is done by dropping partitions
  • Loading branch information
filipecabaco authored Nov 6, 2024
1 parent a805d5a commit c419fce
Show file tree
Hide file tree
Showing 10 changed files with 241 additions and 24 deletions.
36 changes: 29 additions & 7 deletions lib/realtime/messages.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,39 @@ defmodule Realtime.Messages do
@moduledoc """
Handles `realtime.messages` table operations
"""
import Ecto.Query
alias Realtime.Repo
alias Realtime.Api.Message

@doc """
Deletes messages older than 72 hours for a given tenant connection
"""
@spec delete_old_messages(pid()) :: {:ok, any()} | {:error, any()}
@spec delete_old_messages(pid()) :: :ok
def delete_old_messages(conn) do
limit = NaiveDateTime.utc_now() |> NaiveDateTime.add(-72, :hour)
query = from m in Message, where: m.inserted_at <= ^limit
Repo.del(conn, query)
limit =
NaiveDateTime.utc_now()
|> NaiveDateTime.add(-72, :hour)
|> NaiveDateTime.to_date()

%{rows: rows} =
Postgrex.query!(
conn,
"""
SELECT child.relname
FROM pg_inherits
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace
JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace
WHERE parent.relname = 'messages'
AND nmsp_child.nspname = 'realtime'
""",
[]
)

rows
|> Enum.filter(fn ["messages_" <> date] ->
date |> String.replace("_", "-") |> Date.from_iso8601!() |> Date.compare(limit) == :lt
end)
|> Enum.each(&Postgrex.query!(conn, "DROP TABLE IF EXISTS realtime.#{&1}", []))

:ok
end
end
4 changes: 3 additions & 1 deletion lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@ defmodule Realtime.Tenants.Connect do
alias Realtime.Tenants.Connect.GetTenant
alias Realtime.Tenants.Connect.RegisterProcess
alias Realtime.Tenants.Connect.StartCounters
alias Realtime.Tenants.Connect.CreatePartitions

@pipes [
GetTenant,
CheckConnection,
Migrations,
StartCounters,
StartReplication,
RegisterProcess
RegisterProcess,
CreatePartitions
]
@rpc_timeout_default 30_000
@check_connected_user_interval_default 50_000
Expand Down
34 changes: 34 additions & 0 deletions lib/realtime/tenants/connect/create_partitions.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
defmodule Realtime.Tenants.Connect.CreatePartitions do
alias Realtime.Database

@behaviour Realtime.Tenants.Connect.Piper

@impl true
def run(%{db_conn_pid: db_conn_pid} = acc) do
today = Date.utc_today()
yesterday = Date.add(today, -1)
tomorrow = Date.add(today, 1)

dates = [yesterday, today, tomorrow]

Enum.each(dates, fn date ->
partition_name = "messages_#{date |> Date.to_iso8601() |> String.replace("-", "_")}"
start_timestamp = Date.to_string(date)
end_timestamp = Date.to_string(Date.add(date, 1))

Database.transaction(db_conn_pid, fn conn ->
Postgrex.query(
conn,
"""
CREATE TABLE IF NOT EXISTS realtime.#{partition_name}
PARTITION OF realtime.messages
FOR VALUES FROM ('#{start_timestamp}') TO ('#{end_timestamp}');
""",
[]
)
end)
end)

{:ok, acc}
end
end
2 changes: 1 addition & 1 deletion lib/realtime/tenants/janitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ defmodule Realtime.Tenants.Janitor do
Logger.info("Janitor cleaned realtime.messages")

with {:ok, conn} <- Database.connect(tenant, "realtime_janitor", 1),
{:ok, _} <- Messages.delete_old_messages(conn) do
:ok <- Messages.delete_old_messages(conn) do
Logger.info("Janitor finished")
:ok
end
Expand Down
6 changes: 4 additions & 2 deletions lib/realtime/tenants/migrations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ defmodule Realtime.Tenants.Migrations do
FilterDeletePostgresChanges,
AddPayloadToMessages,
ChangeMessagesIdType,
UuidAutoGeneration
UuidAutoGeneration,
MessagesPartitioning
}

@migrations [
Expand Down Expand Up @@ -115,7 +116,8 @@ defmodule Realtime.Tenants.Migrations do
{20_240_827_160_934, FilterDeletePostgresChanges},
{20_240_919_163_303, AddPayloadToMessages},
{20_240_919_163_305, ChangeMessagesIdType},
{20_241_019_105_805, UuidAutoGeneration}
{20_241_019_105_805, UuidAutoGeneration},
{20_241_030_150_047, MessagesPartitioning}
]
defstruct [:tenant_external_id, :settings]
@spec run_migrations(map()) :: :ok | {:error, any()}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
defmodule Realtime.Tenants.Migrations.MessagesPartitioning do
@moduledoc false
use Ecto.Migration

def change do
execute("""
CREATE TABLE IF NOT EXISTS realtime.messages_new (
id BIGSERIAL,
uuid TEXT DEFAULT gen_random_uuid(),
topic TEXT NOT NULL,
extension TEXT NOT NULL,
payload JSONB,
event TEXT,
private BOOLEAN DEFAULT FALSE,
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
inserted_at TIMESTAMP NOT NULL DEFAULT NOW(),
PRIMARY KEY (id, inserted_at)
) PARTITION BY RANGE (inserted_at)
""")

execute("ALTER TABLE realtime.messages_new ENABLE ROW LEVEL SECURITY")

execute("""
DO $$
DECLARE
rec record;
sql text;
role_list text;
BEGIN
FOR rec IN
SELECT *
FROM pg_policies
WHERE schemaname = 'realtime'
AND tablename = 'messages'
LOOP
-- Start constructing the create policy statement
sql := 'CREATE POLICY ' || quote_ident(rec.policyname) ||
' ON realtime.messages_new ';
IF (rec.permissive = 'PERMISSIVE') THEN
sql := sql || 'AS PERMISSIVE ';
ELSE
sql := sql || 'AS RESTRICTIVE ';
END IF;
sql := sql || ' FOR ' || rec.cmd;
-- Include roles if specified
IF rec.roles IS NOT NULL AND array_length(rec.roles, 1) > 0 THEN
role_list := (
SELECT string_agg(quote_ident(role), ', ')
FROM unnest(rec.roles) AS role
);
sql := sql || ' TO ' || role_list;
END IF;
-- Include using clause if specified
IF rec.qual IS NOT NULL THEN
sql := sql || ' USING (' || rec.qual || ')';
END IF;
-- Include with check clause if specified
IF rec.with_check IS NOT NULL THEN
sql := sql || ' WITH CHECK (' || rec.with_check || ')';
END IF;
-- Output the constructed sql for debugging purposes
RAISE NOTICE 'Executing: %', sql;
-- Execute the constructed sql statement
EXECUTE sql;
END LOOP;
END
$$
""")

execute("ALTER TABLE realtime.messages RENAME TO messages_old")
execute("ALTER TABLE realtime.messages_new RENAME TO messages")
execute("DROP TABLE realtime.messages_old")

execute("CREATE SEQUENCE IF NOT EXISTS realtime.messages_id_seq")

execute(
"ALTER TABLE realtime.messages ALTER COLUMN id SET DEFAULT nextval('realtime.messages_id_seq')"
)

execute("ALTER table realtime.messages OWNER to supabase_realtime_admin")

execute(
"GRANT USAGE ON SEQUENCE realtime.messages_id_seq TO postgres, anon, authenticated, service_role"
)

execute("GRANT SELECT ON realtime.messages TO postgres, anon, authenticated, service_role")
execute("GRANT UPDATE ON realtime.messages TO postgres, anon, authenticated, service_role")
execute("GRANT INSERT ON realtime.messages TO postgres, anon, authenticated, service_role")

execute("ALTER TABLE realtime.messages ENABLE ROW LEVEL SECURITY")

execute("""
CREATE OR REPLACE FUNCTION realtime.send(payload jsonb, event text, topic text, private boolean DEFAULT true)
RETURNS void
AS $$
DECLARE
partition_name text;
BEGIN
partition_name := 'messages_' || to_char(NOW(), 'YYYY_MM_DD');
IF NOT EXISTS (
SELECT 1
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = 'realtime'
AND c.relname = partition_name
) THEN
EXECUTE format(
'CREATE TABLE %I PARTITION OF realtime.messages FOR VALUES FROM (%L) TO (%L)',
partition_name,
NOW(),
(NOW() + interval '1 day')::timestamp
);
END IF;
INSERT INTO realtime.messages (payload, event, topic, private, extension)
VALUES (payload, event, topic, private, 'broadcast');
END;
$$
LANGUAGE plpgsql;
""")
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.33.18",
version: "2.33.19",
elixir: "~> 1.16.0",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
11 changes: 5 additions & 6 deletions test/integration/rt_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -641,8 +641,6 @@ defmodule Realtime.Integration.RtChannelTest do
table_name: table_name
} do
value = random_string()
Postgrex.query!(db_conn, "INSERT INTO #{table_name} (details) VALUES ($1)", [value])
:timer.sleep(500)

{socket, _} = get_connection("authenticated")
config = %{broadcast: %{self: true}, private: true}
Expand All @@ -654,6 +652,8 @@ defmodule Realtime.Integration.RtChannelTest do
:timer.sleep(500)
new_value = random_string()

Postgrex.query!(db_conn, "INSERT INTO #{table_name} (details) VALUES ($1)", [value])

Postgrex.query!(db_conn, "UPDATE #{table_name} SET details = $1 WHERE details = $2", [
new_value,
value
Expand Down Expand Up @@ -691,10 +691,6 @@ defmodule Realtime.Integration.RtChannelTest do
db_conn: db_conn,
table_name: table_name
} do
value = random_string()
Postgrex.query!(db_conn, "INSERT INTO #{table_name} (details) VALUES ($1)", [value])
:timer.sleep(500)

{socket, _} = get_connection("authenticated")
config = %{broadcast: %{self: true}, private: true}
topic = "realtime:#{topic}"
Expand All @@ -703,6 +699,9 @@ defmodule Realtime.Integration.RtChannelTest do
assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}}, 500
assert_receive %Message{event: "presence_state"}, 500
:timer.sleep(500)
value = random_string()

Postgrex.query!(db_conn, "INSERT INTO #{table_name} (details) VALUES ($1)", [value])
Postgrex.query!(db_conn, "DELETE FROM #{table_name} WHERE details = $1", [value])

record = %{"details" => value, "id" => 1}
Expand Down
21 changes: 15 additions & 6 deletions test/realtime/messages_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,36 @@ defmodule Realtime.MessagesTest do

{:ok, conn} = Database.connect(tenant, "realtime_test", 1)
clean_table(conn, "realtime", "messages")

%{conn: conn, tenant: tenant}
date_start = Date.utc_today() |> Date.add(-10)
date_end = Date.utc_today()
create_messages_partitions(conn, date_start, date_end)
%{conn: conn, tenant: tenant, date_start: date_start, date_end: date_end}
end

test "delete_old_messages/1 deletes messages older than 72 hours", %{conn: conn, tenant: tenant} do
test "delete_old_messages/1 deletes messages older than 72 hours", %{
conn: conn,
tenant: tenant,
date_start: date_start,
date_end: date_end
} do
utc_now = NaiveDateTime.utc_now()
limit = NaiveDateTime.add(utc_now, -72, :hour)

messages =
for days <- -5..0 do
inserted_at = NaiveDateTime.add(utc_now, days, :day)
for date <- Date.range(date_start, date_end) do
inserted_at = date |> NaiveDateTime.new!(Time.new!(0, 0, 0))
message_fixture(tenant, %{inserted_at: inserted_at})
end

assert length(messages) == 11

to_keep =
Enum.reject(
messages,
&(NaiveDateTime.compare(limit, &1.inserted_at) == :gt)
)

Messages.delete_old_messages(conn)
assert :ok = Messages.delete_old_messages(conn)
{:ok, current} = Repo.all(conn, from(m in Message), Message)

assert current == to_keep
Expand Down
19 changes: 19 additions & 0 deletions test/support/generators.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ defmodule Generators do

def message_fixture(tenant, override \\ %{}) do
{:ok, db_conn} = Database.connect(tenant, "realtime_test", 1)
Realtime.Tenants.Connect.CreatePartitions.run(%{db_conn_pid: db_conn})

create_attrs = %{
"topic" => random_string(),
Expand Down Expand Up @@ -91,6 +92,24 @@ defmodule Generators do
end)
end

def create_messages_partitions(db_conn, start_date, end_date) do
Enum.each(Date.range(start_date, end_date), fn date ->
partition_name = "messages_#{date |> Date.to_iso8601() |> String.replace("-", "_")}"
start_timestamp = Date.to_string(date)
end_timestamp = Date.to_string(Date.add(date, 1))

Postgrex.query!(
db_conn,
"""
CREATE TABLE IF NOT EXISTS realtime.#{partition_name}
PARTITION OF realtime.messages
FOR VALUES FROM ('#{start_timestamp}') TO ('#{end_timestamp}');
""",
[]
)
end)
end

@doc """
Creates support RLS policies given a name and params to be used by the policies
Supported:
Expand Down

0 comments on commit c419fce

Please sign in to comment.