Skip to content

Commit

Permalink
fix: Set IP version & Add JWT e2e tests (#1254)
Browse files Browse the repository at this point in the history
Set IP version & Add JWT e2e tests
  • Loading branch information
filipecabaco authored Jan 6, 2025
1 parent b29c51e commit 97d4498
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 31 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ If you're using the default tenant, the URL is `ws://realtime-dev.localhost:4000
| DB_NAME | string | Postgres database name |
| DB_ENC_KEY | string | Key used to encrypt sensitive fields in \_realtime.tenants and \_realtime.extensions tables. Recommended: 16 characters. |
| DB_AFTER_CONNECT_QUERY | string | Query that is run after server connects to database. |
| DB_IP_VERSION | string | Sets the IP Version to be used. Allowed values are "ipv6" and "ipv4". If none are set we will try to infer the correct version |
| API_JWT_SECRET | string | Secret that is used to sign tokens used to manage tenants and their extensions via HTTP requests. |
| SECRET_KEY_BASE | string | Secret used by the server to sign cookies. Recommended: 64 characters. |
| ERL_AFLAGS | string | Set to either "-proto_dist inet_tcp" or "-proto_dist inet6_tcp" depending on whether or not your network uses IPv4 or IPv6, respectively. |
Expand Down
19 changes: 16 additions & 3 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,25 @@ username = System.get_env("DB_USER", "postgres")
password = System.get_env("DB_PASSWORD", "postgres")
database = System.get_env("DB_NAME", "postgres")
port = System.get_env("DB_PORT", "5432")
db_version = System.get_env("DB_IP_VERSION")
slot_name_suffix = System.get_env("SLOT_NAME_SUFFIX")

if !(db_version in [nil, "ipv6", "ipv4"]),
do: raise("Invalid IP version, please set either ipv6 or ipv4")

socket_options =
case Realtime.Database.detect_ip_version(default_db_host) do
{:ok, ip_version} -> [ip_version]
{:error, reason} -> raise "Failed to detect IP version for DB_HOST: #{reason}"
cond do
db_version == "ipv6" ->
[:inet6]

db_version == "ipv4" ->
[:inet]

true ->
case Realtime.Database.detect_ip_version(default_db_host) do
{:ok, ip_version} -> [ip_version]
{:error, reason} -> raise "Failed to detect IP version for DB_HOST: #{reason}"
end
end

config :realtime,
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.33.73",
version: "2.33.74",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
1 change: 1 addition & 0 deletions test/e2e/.template.env
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
PROJECT_URL=
PROJECT_ANON_TOKEN=
PROJECT_JWT_SECRET
25 changes: 20 additions & 5 deletions test/e2e/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,41 @@ CREATE TABLE public.broadcast_changes (
value text NOT NULL
);

CREATE TABLE public.wallet (
id text PRIMARY KEY,
wallet_id text NOT NULL
);
INSERT INTO public.wallet (wallet_id) VALUES (1, 'wallet_1');

ALTER TABLE public.pg_changes ENABLE ROW LEVEL SECURITY;

ALTER TABLE public.authorization ENABLE ROW LEVEL SECURITY;

ALTER TABLE public.broadcast_changes ENABLE ROW LEVEL SECURITY;

ALTER TABLE public.wallet ENABLE ROW LEVEL SECURITY;

ALTER PUBLICATION supabase_realtime
ADD TABLE public.pg_changes;

ALTER PUBLICATION supabase_realtime
ADD TABLE public.dummy;

CREATE POLICY "authenticated have full access to read" ON "realtime"."messages" AS PERMISSIVE
CREATE POLICY "authenticated receive on topic" ON "realtime"."messages" AS PERMISSIVE
FOR SELECT TO authenticated
USING (TRUE);
USING ( realtime.topic() like 'topic:%');

CREATE POLICY "authenticated have full access to write" ON "realtime"."messages" AS PERMISSIVE
CREATE POLICY "authenticated broadcast on topic" ON "realtime"."messages" AS PERMISSIVE
FOR INSERT TO authenticated
WITH CHECK (TRUE);
WITH CHECK ( realtime.topic() like 'topic:%');

CREATE POLICY "authenticated jwt topic in wallet can receive" ON "realtime"."messages" AS PERMISSIVE
FOR SELECT TO authenticated
USING ( realtime.topic() like 'jwt_topic:%' AND select wallet_id from public.wallet where wallet_id = (auth.jwt() -> 'sub'));

CREATE POLICY "authenticated jwt topic in wallet can broadcast" ON "realtime"."messages" AS PERMISSIVE
FOR INSERT TO authenticated
WITH CHECK ( realtime.topic() like 'jwt_topic:%' AND select wallet_id from public.wallet where wallet_id = (auth.jwt() -> 'sub'));

CREATE POLICY "allow authenticated users all access" ON "public"."pg_changes" AS PERMISSIVE
FOR ALL TO authenticated
Expand All @@ -62,7 +78,6 @@ CREATE POLICY "authenticated have full access to read on broadcast_changes" ON "
FOR ALL TO authenticated
USING (TRUE);


CREATE OR REPLACE FUNCTION broadcast_changes_for_table_trigger ()
RETURNS TRIGGER
AS $$
Expand Down
80 changes: 58 additions & 22 deletions test/e2e/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@ import {
SupabaseClient,
RealtimeChannel,
} from "npm:@supabase/supabase-js@2.47.3";
import {
assert,
assertEquals,
} from "https://deno.land/std@0.224.0/assert/mod.ts";
import { assertEquals } from "https://deno.land/std@0.224.0/assert/mod.ts";
import { describe, it } from "https://deno.land/std@0.224.0/testing/bdd.ts";
import { sleep } from "https://deno.land/x/sleep/mod.ts";

import { JWTPayload, SignJWT } from "https://deno.land/x/jose@v5.9.4/index.ts";
const env = await load();
const url = env["PROJECT_URL"];
const token = env["PROJECT_ANON_TOKEN"];
const jwtSecret = env["PROJECT_JWT_SECRET"];
const realtime = { heartbeatIntervalMs: 500, timeout: 1000 };
const config = { config: { broadcast: { self: true } } };

Expand Down Expand Up @@ -64,13 +63,24 @@ const executeModifyDatabaseActions = async (
await supabase.from(table).delete().eq("id", id);
};

const generateJwtToken = async (payload: JWTPayload) => {
const secret = new TextEncoder().encode(jwtSecret);
const jwt = await new SignJWT(payload)
.setProtectedHeader({ alg: "HS256", typ: "JWT" })
.setIssuedAt()
.setExpirationTime("1h")
.sign(secret);

return jwt;
};

describe("broadcast extension", () => {
it("user is able to receive self broadcast", async () => {
let supabase = await createClient(url, token, { realtime });

let result = null;
let event = crypto.randomUUID();
let topic = crypto.randomUUID();
let topic = "topic:" + crypto.randomUUID();
let expectedPayload = { message: crypto.randomUUID() };

const channel = supabase
Expand All @@ -96,7 +106,7 @@ describe("broadcast extension", () => {

let result = null;
let event = crypto.randomUUID();
let topic = crypto.randomUUID();
let topic = "topic:" + crypto.randomUUID();
let expectedPayload = { message: crypto.randomUUID() };

const activeChannel = supabase
Expand All @@ -122,9 +132,8 @@ describe("postgres changes extension", () => {
let supabase = await createClient(url, token, { realtime });
await signInUser(supabase, "filipe@supabase.io", "test_test");
await supabase.realtime.setAuth();

let result: Array<any> = [];
let topic = crypto.randomUUID();
let topic = "topic:" + crypto.randomUUID();

let previousId = await executeCreateDatabaseActions(supabase, "pg_changes");
await executeCreateDatabaseActions(supabase, "dummy");
Expand Down Expand Up @@ -159,7 +168,7 @@ describe("postgres changes extension", () => {
await supabase.realtime.setAuth();

let result: Array<any> = [];
let topic = crypto.randomUUID();
let topic = "topic:" + crypto.randomUUID();

let mainId = await executeCreateDatabaseActions(supabase, "pg_changes");
let fakeId = await executeCreateDatabaseActions(supabase, "pg_changes");
Expand Down Expand Up @@ -198,7 +207,7 @@ describe("postgres changes extension", () => {
await supabase.realtime.setAuth();

let result: Array<any> = [];
let topic = crypto.randomUUID();
let topic = "topic:" + crypto.randomUUID();

let mainId = await executeCreateDatabaseActions(supabase, "pg_changes");
let fakeId = await executeCreateDatabaseActions(supabase, "pg_changes");
Expand Down Expand Up @@ -235,41 +244,68 @@ describe("postgres changes extension", () => {
describe("authorization check", () => {
it("user using private channel cannot connect if does not have enough permissions", async () => {
let supabase = await createClient(url, token, { realtime });
let result: any = null;
let topic = crypto.randomUUID();
let errMessage: any = null;

let topic = "topic:" + crypto.randomUUID();

const channel = supabase
.channel(topic, { config: { ...config, private: true } })
.channel(topic, { config: { private: true } })
.subscribe((status: string, err: any) => {
if (status == "CHANNEL_ERROR") {
result = err.message;
errMessage = err.message;
}
assert(status == "CHANNEL_ERROR" || status == "CLOSED");
});

await sleep(2);
await sleep(1);

await stopClient(supabase, [channel]);
assertEquals(
result,
errMessage,
`"You do not have permissions to read from this Channel topic: ${topic}"`
);
});

it("user using private channel can connect if they have enough permissions", async () => {
let topic = "topic:" + crypto.randomUUID();
let supabase = await createClient(url, token, { realtime });
let connected = false;
await signInUser(supabase, "filipe@supabase.io", "test_test");
await supabase.realtime.setAuth();

const channel = supabase
.channel(crypto.randomUUID(), { config: { ...config, private: true } })
.subscribe((status: string) =>
assert(status == "SUBSCRIBED" || status == "CLOSED")
);
.channel(topic, { config: { private: true } })
.subscribe((status: string) => {
if (status == "SUBSCRIBED") {
connected = true;
}
});

await sleep(2);
await sleep(1);
await supabase.auth.signOut();
await stopClient(supabase, [channel]);
assertEquals(connected, true);
});

it("user using private channel for jwt connections can connect if they have enough permissions based on claims", async () => {
let topic = "jwt_topic:" + crypto.randomUUID();
let supabase = await createClient(url, token, { realtime });
let connected = false;
let claims = { role: "authenticated", sub: "wallet_1" };
let jwt_token = await generateJwtToken(claims);

await supabase.realtime.setAuth(jwt_token);

const channel = supabase
.channel(topic, { config: { private: true } })
.subscribe((status: string, err: any) => {
if (status == "SUBSCRIBED") {
connected = true;
}
});

await sleep(1);
await stopClient(supabase, [channel]);
assertEquals(connected, true);
});
});

Expand Down

0 comments on commit 97d4498

Please sign in to comment.