Skip to content

Commit

Permalink
Merge pull request #32 from Mic92/push-nvwosookxovy
Browse files Browse the repository at this point in the history
Optimize pending closure queries
  • Loading branch information
Mic92 authored Dec 9, 2024
2 parents d91ab21 + fab70c6 commit a1a350f
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 60 deletions.
27 changes: 16 additions & 11 deletions server/pg/functions/1_commit_pending_closure.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ CREATE OR REPLACE FUNCTION commit_pending_closure(closure_id bigint)
RETURNS void AS $$
DECLARE
is_inserted BOOLEAN;
inserted_count INTEGER;
closure_key VARCHAR;
now timestamp without time zone := timezone('UTC', now());
BEGIN
-- Commit the pending closure and capture the inserted value
INSERT INTO closures (updated_at, key)
SELECT timezone('UTC', NOW()), key FROM pending_closures WHERE id = closure_id
SELECT now, key FROM pending_closures WHERE id = closure_id
ON CONFLICT (key)
DO UPDATE SET updated_at = timezone('UTC', NOW())
DO UPDATE SET updated_at = now
RETURNING (xmax = 0) AS is_inserted, key AS closure_key
INTO is_inserted, closure_key;

Expand All @@ -22,15 +22,20 @@ BEGIN

-- If the closure was inserted, commit the pending objects
IF is_inserted THEN
-- Commit the pending objects that we don't already have
INSERT INTO objects (key)
SELECT key FROM pending_objects WHERE pending_closure_id = closure_id
ON CONFLICT (key)
DO NOTHING;

-- Commit the pending objects closure
-- Commit the pending objects that we don't already have and the corresponding closure_objects
WITH pending_keys AS (
SELECT key
FROM pending_objects
WHERE pending_closure_id = closure_id
), insert_objects AS (
INSERT INTO objects (key)
SELECT key FROM pending_keys
ON CONFLICT (key) DO NOTHING
RETURNING key
)
INSERT INTO closure_objects (closure_key, object_key)
SELECT closure_key, key FROM pending_objects WHERE pending_closure_id = closure_id;
SELECT closure_key, key
FROM pending_keys;
END IF;

-- Delete the pending objects
Expand Down
34 changes: 0 additions & 34 deletions server/pg/functions/2_cleanup_pending_closure.sql

This file was deleted.

4 changes: 3 additions & 1 deletion server/pg/migrations/20241026095416_initial_model.sql
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,12 @@ CREATE TABLE pending_closures
key varchar(1024) NOT NULL,
started_at timestamp NOT NULL
);
CREATE INDEX pending_closures_started_at_idx ON pending_closures (started_at);

-- This is where track not yet uploaded objects associated with a pending closure
CREATE TABLE pending_objects
(
pending_closure_id bigint NOT NULL REFERENCES pending_closures (id),
pending_closure_id bigint NOT NULL REFERENCES pending_closures (id) ON DELETE CASCADE,
key varchar(1024) NOT NULL,
PRIMARY KEY (key, pending_closure_id)
);
Expand All @@ -73,6 +74,7 @@ DROP INDEX closure_objects_closure_key_idx;
DROP INDEX pending_objects_pending_closure_id_idx;
DROP INDEX closure_objects_object_key_idx;
DROP INDEX closures_updated_at_idx;
DROP INDEX pending_closures_started_at_idx;

DROP TABLE closures;
DROP TABLE objects;
Expand Down
37 changes: 36 additions & 1 deletion server/pg/query.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,42 @@ WHERE key = any($1::varchar []);
SELECT commit_pending_closure($1::bigint);

-- name: CleanupPendingClosures :exec
SELECT cleanup_pending_closures($1::int);
WITH cutoff_time AS (
SELECT timezone('UTC', now()) - interval '1 second' * $1 AS time
),

old_closures AS (
SELECT id
FROM pending_closures, cutoff_time
WHERE started_at < cutoff_time.time
),

-- Insert pending objects into objects table if they don't already exist
-- We mark them as deleted so they can be cleaned up later
inserted_objects AS (
INSERT INTO objects (key, deleted_at)
SELECT
po.key,
cutoff_time.time
FROM pending_objects AS po
JOIN old_closures oc ON po.pending_closure_id = oc.id, cutoff_time
ON CONFLICT (key) DO NOTHING
RETURNING key
),

-- Delete pending objects that were inserted into the objects table
deleted_pending_objects AS (
DELETE FROM pending_objects
USING old_closures
WHERE pending_objects.pending_closure_id = old_closures.id
RETURNING pending_closure_id
)

-- Delete pending closures older than the specified interval
-- This will cascade to pending_objects
DELETE FROM pending_closures
USING old_closures
WHERE pending_closures.id = old_closures.id;

-- name: GetClosure :one
SELECT updated_at FROM closures WHERE key = $1 LIMIT 1;
Expand Down
55 changes: 42 additions & 13 deletions server/pg/query.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit a1a350f

Please sign in to comment.