-
-
Notifications
You must be signed in to change notification settings - Fork 9
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #76 from supabase/or/delete_filters
Filter on `old_record` during deletes
- Loading branch information
Showing
13 changed files
with
659 additions
and
92 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,301 @@ | ||
create or replace function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024) | ||
returns setof realtime.wal_rls | ||
language plpgsql | ||
volatile | ||
as $$ | ||
declare | ||
-- Regclass of the table e.g. public.notes | ||
entity_ regclass = (quote_ident(wal ->> 'schema') || '.' || quote_ident(wal ->> 'table'))::regclass; | ||
|
||
-- I, U, D, T: insert, update ... | ||
action realtime.action = ( | ||
case wal ->> 'action' | ||
when 'I' then 'INSERT' | ||
when 'U' then 'UPDATE' | ||
when 'D' then 'DELETE' | ||
else 'ERROR' | ||
end | ||
); | ||
|
||
-- Is row level security enabled for the table | ||
is_rls_enabled bool = relrowsecurity from pg_class where oid = entity_; | ||
|
||
subscriptions realtime.subscription[] = array_agg(subs) | ||
from | ||
realtime.subscription subs | ||
where | ||
subs.entity = entity_; | ||
|
||
-- Subscription vars | ||
roles regrole[] = array_agg(distinct us.claims_role::text) | ||
from | ||
unnest(subscriptions) us; | ||
|
||
working_role regrole; | ||
claimed_role regrole; | ||
claims jsonb; | ||
|
||
subscription_id uuid; | ||
subscription_has_access bool; | ||
visible_to_subscription_ids uuid[] = '{}'; | ||
|
||
-- structured info for wal's columns | ||
columns realtime.wal_column[]; | ||
-- previous identity values for update/delete | ||
old_columns realtime.wal_column[]; | ||
|
||
error_record_exceeds_max_size boolean = octet_length(wal::text) > max_record_bytes; | ||
|
||
-- Primary jsonb output for record | ||
output jsonb; | ||
|
||
begin | ||
perform set_config('role', null, true); | ||
|
||
columns = | ||
array_agg( | ||
( | ||
x->>'name', | ||
x->>'type', | ||
x->>'typeoid', | ||
realtime.cast( | ||
(x->'value') #>> '{}', | ||
coalesce( | ||
(x->>'typeoid')::regtype, -- null when wal2json version <= 2.4 | ||
(x->>'type')::regtype | ||
) | ||
), | ||
(pks ->> 'name') is not null, | ||
true | ||
)::realtime.wal_column | ||
) | ||
from | ||
jsonb_array_elements(wal -> 'columns') x | ||
left join jsonb_array_elements(wal -> 'pk') pks | ||
on (x ->> 'name') = (pks ->> 'name'); | ||
|
||
old_columns = | ||
array_agg( | ||
( | ||
x->>'name', | ||
x->>'type', | ||
x->>'typeoid', | ||
realtime.cast( | ||
(x->'value') #>> '{}', | ||
coalesce( | ||
(x->>'typeoid')::regtype, -- null when wal2json version <= 2.4 | ||
(x->>'type')::regtype | ||
) | ||
), | ||
(pks ->> 'name') is not null, | ||
true | ||
)::realtime.wal_column | ||
) | ||
from | ||
jsonb_array_elements(wal -> 'identity') x | ||
left join jsonb_array_elements(wal -> 'pk') pks | ||
on (x ->> 'name') = (pks ->> 'name'); | ||
|
||
for working_role in select * from unnest(roles) loop | ||
|
||
-- Update `is_selectable` for columns and old_columns | ||
columns = | ||
array_agg( | ||
( | ||
c.name, | ||
c.type_name, | ||
c.type_oid, | ||
c.value, | ||
c.is_pkey, | ||
pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT') | ||
)::realtime.wal_column | ||
) | ||
from | ||
unnest(columns) c; | ||
|
||
old_columns = | ||
array_agg( | ||
( | ||
c.name, | ||
c.type_name, | ||
c.type_oid, | ||
c.value, | ||
c.is_pkey, | ||
pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT') | ||
)::realtime.wal_column | ||
) | ||
from | ||
unnest(old_columns) c; | ||
|
||
if action <> 'DELETE' and count(1) = 0 from unnest(columns) c where c.is_pkey then | ||
return next ( | ||
jsonb_build_object( | ||
'schema', wal ->> 'schema', | ||
'table', wal ->> 'table', | ||
'type', action | ||
), | ||
is_rls_enabled, | ||
-- subscriptions is already filtered by entity | ||
(select array_agg(s.subscription_id) from unnest(subscriptions) as s where claims_role = working_role), | ||
array['Error 400: Bad Request, no primary key'] | ||
)::realtime.wal_rls; | ||
|
||
-- The claims role does not have SELECT permission to the primary key of entity | ||
elsif action <> 'DELETE' and sum(c.is_selectable::int) <> count(1) from unnest(columns) c where c.is_pkey then | ||
return next ( | ||
jsonb_build_object( | ||
'schema', wal ->> 'schema', | ||
'table', wal ->> 'table', | ||
'type', action | ||
), | ||
is_rls_enabled, | ||
(select array_agg(s.subscription_id) from unnest(subscriptions) as s where claims_role = working_role), | ||
array['Error 401: Unauthorized'] | ||
)::realtime.wal_rls; | ||
|
||
else | ||
output = jsonb_build_object( | ||
'schema', wal ->> 'schema', | ||
'table', wal ->> 'table', | ||
'type', action, | ||
'commit_timestamp', to_char( | ||
((wal ->> 'timestamp')::timestamptz at time zone 'utc'), | ||
'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"' | ||
), | ||
'columns', ( | ||
select | ||
jsonb_agg( | ||
jsonb_build_object( | ||
'name', pa.attname, | ||
'type', pt.typname | ||
) | ||
order by pa.attnum asc | ||
) | ||
from | ||
pg_attribute pa | ||
join pg_type pt | ||
on pa.atttypid = pt.oid | ||
where | ||
attrelid = entity_ | ||
and attnum > 0 | ||
and pg_catalog.has_column_privilege(working_role, entity_, pa.attname, 'SELECT') | ||
) | ||
) | ||
-- Add "record" key for insert and update | ||
|| case | ||
when action in ('INSERT', 'UPDATE') then | ||
jsonb_build_object( | ||
'record', | ||
( | ||
select | ||
jsonb_object_agg( | ||
-- if unchanged toast, get column name and value from old record | ||
coalesce((c).name, (oc).name), | ||
case | ||
when (c).name is null then (oc).value | ||
else (c).value | ||
end | ||
) | ||
from | ||
unnest(columns) c | ||
full outer join unnest(old_columns) oc | ||
on (c).name = (oc).name | ||
where | ||
coalesce((c).is_selectable, (oc).is_selectable) | ||
and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64)) | ||
) | ||
) | ||
else '{}'::jsonb | ||
end | ||
-- Add "old_record" key for update and delete | ||
|| case | ||
when action = 'UPDATE' then | ||
jsonb_build_object( | ||
'old_record', | ||
( | ||
select jsonb_object_agg((c).name, (c).value) | ||
from unnest(old_columns) c | ||
where | ||
(c).is_selectable | ||
and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64)) | ||
) | ||
) | ||
when action = 'DELETE' then | ||
jsonb_build_object( | ||
'old_record', | ||
( | ||
select jsonb_object_agg((c).name, (c).value) | ||
from unnest(old_columns) c | ||
where | ||
(c).is_selectable | ||
and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64)) | ||
and ( not is_rls_enabled or (c).is_pkey ) -- if RLS enabled, we can't secure deletes so filter to pkey | ||
) | ||
) | ||
else '{}'::jsonb | ||
end; | ||
|
||
-- Create the prepared statement | ||
if is_rls_enabled and action <> 'DELETE' then | ||
if (select 1 from pg_prepared_statements where name = 'walrus_rls_stmt' limit 1) > 0 then | ||
deallocate walrus_rls_stmt; | ||
end if; | ||
execute realtime.build_prepared_statement_sql('walrus_rls_stmt', entity_, columns); | ||
end if; | ||
|
||
visible_to_subscription_ids = '{}'; | ||
|
||
for subscription_id, claims in ( | ||
select | ||
subs.subscription_id, | ||
subs.claims | ||
from | ||
unnest(subscriptions) subs | ||
where | ||
subs.entity = entity_ | ||
and subs.claims_role = working_role | ||
and ( | ||
realtime.is_visible_through_filters(columns, subs.filters) | ||
or ( | ||
action = 'DELETE' | ||
and realtime.is_visible_through_filters(old_columns, subs.filters) | ||
) | ||
) | ||
) loop | ||
|
||
if not is_rls_enabled or action = 'DELETE' then | ||
visible_to_subscription_ids = visible_to_subscription_ids || subscription_id; | ||
else | ||
-- Check if RLS allows the role to see the record | ||
perform | ||
-- Trim leading and trailing quotes from working_role because set_config | ||
-- doesn't recognize the role as valid if they are included | ||
set_config('role', trim(both '"' from working_role::text), true), | ||
set_config('request.jwt.claims', claims::text, true); | ||
|
||
execute 'execute walrus_rls_stmt' into subscription_has_access; | ||
|
||
if subscription_has_access then | ||
visible_to_subscription_ids = visible_to_subscription_ids || subscription_id; | ||
end if; | ||
end if; | ||
end loop; | ||
|
||
perform set_config('role', null, true); | ||
|
||
return next ( | ||
output, | ||
is_rls_enabled, | ||
visible_to_subscription_ids, | ||
case | ||
when error_record_exceeds_max_size then array['Error 413: Payload Too Large'] | ||
else '{}' | ||
end | ||
)::realtime.wal_rls; | ||
|
||
end if; | ||
end loop; | ||
|
||
perform set_config('role', null, true); | ||
end; | ||
$$; |
Oops, something went wrong.