-
-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Filter on old_record
during deletes
#76
Merged
Merged
Changes from 3 commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
9a8c11e
delete filters
olirice 30cd143
re-align test output
olirice ab83bd4
set replica identity full in older tests to reduce diff
olirice cc83f03
remove latest docker tag
olirice c95e397
bump to 15
olirice a38326c
15 to all
olirice 309fbbf
rm entirely
olirice a8dc878
pin lib versions
olirice 7a050c4
add apt repo
olirice 1b40acf
fix: quick check if add to path helps
egor-romanov 36a5987
enumerate system catalog column names to avoid diffs
olirice File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,301 @@ | ||
create or replace function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024) | ||
returns setof realtime.wal_rls | ||
language plpgsql | ||
volatile | ||
as $$ | ||
declare | ||
-- Regclass of the table e.g. public.notes | ||
entity_ regclass = (quote_ident(wal ->> 'schema') || '.' || quote_ident(wal ->> 'table'))::regclass; | ||
|
||
-- I, U, D, T: insert, update ... | ||
action realtime.action = ( | ||
case wal ->> 'action' | ||
when 'I' then 'INSERT' | ||
when 'U' then 'UPDATE' | ||
when 'D' then 'DELETE' | ||
else 'ERROR' | ||
end | ||
); | ||
|
||
-- Is row level security enabled for the table | ||
is_rls_enabled bool = relrowsecurity from pg_class where oid = entity_; | ||
|
||
subscriptions realtime.subscription[] = array_agg(subs) | ||
from | ||
realtime.subscription subs | ||
where | ||
subs.entity = entity_; | ||
|
||
-- Subscription vars | ||
roles regrole[] = array_agg(distinct us.claims_role::text) | ||
from | ||
unnest(subscriptions) us; | ||
|
||
working_role regrole; | ||
claimed_role regrole; | ||
claims jsonb; | ||
|
||
subscription_id uuid; | ||
subscription_has_access bool; | ||
visible_to_subscription_ids uuid[] = '{}'; | ||
|
||
-- structured info for wal's columns | ||
columns realtime.wal_column[]; | ||
-- previous identity values for update/delete | ||
old_columns realtime.wal_column[]; | ||
|
||
error_record_exceeds_max_size boolean = octet_length(wal::text) > max_record_bytes; | ||
|
||
-- Primary jsonb output for record | ||
output jsonb; | ||
|
||
begin | ||
perform set_config('role', null, true); | ||
|
||
columns = | ||
array_agg( | ||
( | ||
x->>'name', | ||
x->>'type', | ||
x->>'typeoid', | ||
realtime.cast( | ||
(x->'value') #>> '{}', | ||
coalesce( | ||
(x->>'typeoid')::regtype, -- null when wal2json version <= 2.4 | ||
(x->>'type')::regtype | ||
) | ||
), | ||
(pks ->> 'name') is not null, | ||
true | ||
)::realtime.wal_column | ||
) | ||
from | ||
jsonb_array_elements(wal -> 'columns') x | ||
left join jsonb_array_elements(wal -> 'pk') pks | ||
on (x ->> 'name') = (pks ->> 'name'); | ||
|
||
old_columns = | ||
array_agg( | ||
( | ||
x->>'name', | ||
x->>'type', | ||
x->>'typeoid', | ||
realtime.cast( | ||
(x->'value') #>> '{}', | ||
coalesce( | ||
(x->>'typeoid')::regtype, -- null when wal2json version <= 2.4 | ||
(x->>'type')::regtype | ||
) | ||
), | ||
(pks ->> 'name') is not null, | ||
true | ||
)::realtime.wal_column | ||
) | ||
from | ||
jsonb_array_elements(wal -> 'identity') x | ||
left join jsonb_array_elements(wal -> 'pk') pks | ||
on (x ->> 'name') = (pks ->> 'name'); | ||
|
||
for working_role in select * from unnest(roles) loop | ||
|
||
-- Update `is_selectable` for columns and old_columns | ||
columns = | ||
array_agg( | ||
( | ||
c.name, | ||
c.type_name, | ||
c.type_oid, | ||
c.value, | ||
c.is_pkey, | ||
pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT') | ||
)::realtime.wal_column | ||
) | ||
from | ||
unnest(columns) c; | ||
|
||
old_columns = | ||
array_agg( | ||
( | ||
c.name, | ||
c.type_name, | ||
c.type_oid, | ||
c.value, | ||
c.is_pkey, | ||
pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT') | ||
)::realtime.wal_column | ||
) | ||
from | ||
unnest(old_columns) c; | ||
|
||
if action <> 'DELETE' and count(1) = 0 from unnest(columns) c where c.is_pkey then | ||
return next ( | ||
jsonb_build_object( | ||
'schema', wal ->> 'schema', | ||
'table', wal ->> 'table', | ||
'type', action | ||
), | ||
is_rls_enabled, | ||
-- subscriptions is already filtered by entity | ||
(select array_agg(s.subscription_id) from unnest(subscriptions) as s where claims_role = working_role), | ||
array['Error 400: Bad Request, no primary key'] | ||
)::realtime.wal_rls; | ||
|
||
-- The claims role does not have SELECT permission to the primary key of entity | ||
elsif action <> 'DELETE' and sum(c.is_selectable::int) <> count(1) from unnest(columns) c where c.is_pkey then | ||
return next ( | ||
jsonb_build_object( | ||
'schema', wal ->> 'schema', | ||
'table', wal ->> 'table', | ||
'type', action | ||
), | ||
is_rls_enabled, | ||
(select array_agg(s.subscription_id) from unnest(subscriptions) as s where claims_role = working_role), | ||
array['Error 401: Unauthorized'] | ||
)::realtime.wal_rls; | ||
|
||
else | ||
output = jsonb_build_object( | ||
'schema', wal ->> 'schema', | ||
'table', wal ->> 'table', | ||
'type', action, | ||
'commit_timestamp', to_char( | ||
((wal ->> 'timestamp')::timestamptz at time zone 'utc'), | ||
'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"' | ||
), | ||
'columns', ( | ||
select | ||
jsonb_agg( | ||
jsonb_build_object( | ||
'name', pa.attname, | ||
'type', pt.typname | ||
) | ||
order by pa.attnum asc | ||
) | ||
from | ||
pg_attribute pa | ||
join pg_type pt | ||
on pa.atttypid = pt.oid | ||
where | ||
attrelid = entity_ | ||
and attnum > 0 | ||
and pg_catalog.has_column_privilege(working_role, entity_, pa.attname, 'SELECT') | ||
) | ||
) | ||
-- Add "record" key for insert and update | ||
|| case | ||
when action in ('INSERT', 'UPDATE') then | ||
jsonb_build_object( | ||
'record', | ||
( | ||
select | ||
jsonb_object_agg( | ||
-- if unchanged toast, get column name and value from old record | ||
coalesce((c).name, (oc).name), | ||
case | ||
when (c).name is null then (oc).value | ||
else (c).value | ||
end | ||
) | ||
from | ||
unnest(columns) c | ||
full outer join unnest(old_columns) oc | ||
on (c).name = (oc).name | ||
where | ||
coalesce((c).is_selectable, (oc).is_selectable) | ||
and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64)) | ||
) | ||
) | ||
else '{}'::jsonb | ||
end | ||
-- Add "old_record" key for update and delete | ||
|| case | ||
when action = 'UPDATE' then | ||
jsonb_build_object( | ||
'old_record', | ||
( | ||
select jsonb_object_agg((c).name, (c).value) | ||
from unnest(old_columns) c | ||
where | ||
(c).is_selectable | ||
and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64)) | ||
) | ||
) | ||
when action = 'DELETE' then | ||
jsonb_build_object( | ||
'old_record', | ||
( | ||
select jsonb_object_agg((c).name, (c).value) | ||
from unnest(old_columns) c | ||
where | ||
(c).is_selectable | ||
and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64)) | ||
and ( not is_rls_enabled or (c).is_pkey ) -- if RLS enabled, we can't secure deletes so filter to pkey | ||
) | ||
) | ||
else '{}'::jsonb | ||
end; | ||
|
||
-- Create the prepared statement | ||
if is_rls_enabled and action <> 'DELETE' then | ||
if (select 1 from pg_prepared_statements where name = 'walrus_rls_stmt' limit 1) > 0 then | ||
deallocate walrus_rls_stmt; | ||
end if; | ||
execute realtime.build_prepared_statement_sql('walrus_rls_stmt', entity_, columns); | ||
end if; | ||
|
||
visible_to_subscription_ids = '{}'; | ||
|
||
for subscription_id, claims in ( | ||
select | ||
subs.subscription_id, | ||
subs.claims | ||
from | ||
unnest(subscriptions) subs | ||
where | ||
subs.entity = entity_ | ||
and subs.claims_role = working_role | ||
and ( | ||
realtime.is_visible_through_filters(columns, subs.filters) | ||
or ( | ||
action = 'DELETE' | ||
and realtime.is_visible_through_filters(old_columns, subs.filters) | ||
) | ||
) | ||
) loop | ||
|
||
if not is_rls_enabled or action = 'DELETE' then | ||
visible_to_subscription_ids = visible_to_subscription_ids || subscription_id; | ||
else | ||
-- Check if RLS allows the role to see the record | ||
perform | ||
-- Trim leading and trailing quotes from working_role because set_config | ||
-- doesn't recognize the role as valid if they are included | ||
set_config('role', trim(both '"' from working_role::text), true), | ||
set_config('request.jwt.claims', claims::text, true); | ||
|
||
execute 'execute walrus_rls_stmt' into subscription_has_access; | ||
|
||
if subscription_has_access then | ||
visible_to_subscription_ids = visible_to_subscription_ids || subscription_id; | ||
end if; | ||
end if; | ||
end loop; | ||
|
||
perform set_config('role', null, true); | ||
|
||
return next ( | ||
output, | ||
is_rls_enabled, | ||
visible_to_subscription_ids, | ||
case | ||
when error_record_exceeds_max_size then array['Error 413: Payload Too Large'] | ||
else '{}' | ||
end | ||
)::realtime.wal_rls; | ||
|
||
end if; | ||
end loop; | ||
|
||
perform set_config('role', null, true); | ||
end; | ||
$$; |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the change