From 5498dfd7d69b52925ed2beac12c00c4aa81202db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Thalheim?= Date: Mon, 9 Dec 2024 11:50:30 +0100 Subject: [PATCH 1/3] use faster "EXIST" instead of NOT IN --- server/pg/query.sql | 18 +++++++++--------- server/pg/query.sql.go | 22 +++++++++++----------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/server/pg/query.sql b/server/pg/query.sql index 2fedc9b..0f37083 100644 --- a/server/pg/query.sql +++ b/server/pg/query.sql @@ -39,29 +39,29 @@ DELETE FROM closures WHERE updated_at < $1; WITH ct AS ( SELECT timezone('UTC', now()) AS now ), + stale_objects AS ( SELECT o.key FROM objects AS o, ct WHERE - o.key NOT IN ( - SELECT co.object_key + NOT EXISTS ( + SELECT 1 FROM closure_objects AS co WHERE co.object_key = o.key ) - AND ( - o.key NOT IN ( - SELECT po.key - FROM pending_objects AS po - WHERE po.key = o.key - ) + AND NOT EXISTS ( + SELECT 1 + FROM pending_objects AS po + WHERE po.key = o.key ) AND ( o.deleted_at IS NULL - OR o.deleted_at < ct.now - INTERVAL '1 hour' + OR o.deleted_at < ct.now - interval '1 hour' ) FOR UPDATE LIMIT $1 ) + UPDATE objects SET deleted_at = ct.now FROM stale_objects, ct diff --git a/server/pg/query.sql.go b/server/pg/query.sql.go index cfe2fd8..fa3d0ee 100644 --- a/server/pg/query.sql.go +++ b/server/pg/query.sql.go @@ -151,33 +151,33 @@ func (q *Queries) MarkObjectsAsActive(ctx context.Context, dollar_1 []string) er const markObjectsForDeletion = `-- name: MarkObjectsForDeletion :many WITH ct AS ( - SELECT timezone('UTC', now()) AS current_utc + SELECT timezone('UTC', now()) AS now ), + stale_objects AS ( SELECT o.key FROM objects AS o, ct WHERE - o.key NOT IN ( - SELECT co.object_key + NOT EXISTS ( + SELECT 1 FROM closure_objects AS co WHERE co.object_key = o.key ) - AND ( - o.key NOT IN ( - SELECT po.key - FROM pending_objects AS po - WHERE po.key = o.key - ) + AND NOT EXISTS ( + SELECT 1 + FROM pending_objects AS po + WHERE po.key = o.key ) AND ( o.deleted_at IS NULL - OR o.deleted_at < ct.current_utc - INTERVAL '1 hour' + OR o.deleted_at < ct.now - interval '1 hour' ) FOR UPDATE LIMIT $1 ) + UPDATE objects -SET deleted_at = ct.current_utc +SET deleted_at = ct.now FROM stale_objects, ct WHERE objects.key = stale_objects.key RETURNING objects.key From 87b891eceb7c53ebb9fa12d51870cd9448779923 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Thalheim?= Date: Mon, 9 Dec 2024 12:23:25 +0100 Subject: [PATCH 2/3] commit_pending_closure: use CTE to only query pending keys once also compute current time only once. --- .../pg/functions/1_commit_pending_closure.sql | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/server/pg/functions/1_commit_pending_closure.sql b/server/pg/functions/1_commit_pending_closure.sql index 343e556..a5d6821 100644 --- a/server/pg/functions/1_commit_pending_closure.sql +++ b/server/pg/functions/1_commit_pending_closure.sql @@ -5,14 +5,14 @@ CREATE OR REPLACE FUNCTION commit_pending_closure(closure_id bigint) RETURNS void AS $$ DECLARE is_inserted BOOLEAN; - inserted_count INTEGER; closure_key VARCHAR; + now timestamp without time zone := timezone('UTC', now()); BEGIN -- Commit the pending closure and capture the inserted value INSERT INTO closures (updated_at, key) - SELECT timezone('UTC', NOW()), key FROM pending_closures WHERE id = closure_id + SELECT now, key FROM pending_closures WHERE id = closure_id ON CONFLICT (key) - DO UPDATE SET updated_at = timezone('UTC', NOW()) + DO UPDATE SET updated_at = now RETURNING (xmax = 0) AS is_inserted, key AS closure_key INTO is_inserted, closure_key; @@ -22,15 +22,20 @@ BEGIN -- If the closure was inserted, commit the pending objects IF is_inserted THEN - -- Commit the pending objects that we don't already have - INSERT INTO objects (key) - SELECT key FROM pending_objects WHERE pending_closure_id = closure_id - ON CONFLICT (key) - DO NOTHING; - - -- Commit the pending objects closure + -- Commit the pending objects that we don't already have and the corresponding closure_objects + WITH pending_keys AS ( + SELECT key + FROM pending_objects + WHERE pending_closure_id = closure_id + ), insert_objects AS ( + INSERT INTO objects (key) + SELECT key FROM pending_keys + ON CONFLICT (key) DO NOTHING + RETURNING key + ) INSERT INTO closure_objects (closure_key, object_key) - SELECT closure_key, key FROM pending_objects WHERE pending_closure_id = closure_id; + SELECT closure_key, key + FROM pending_keys; END IF; -- Delete the pending objects From fab70c6cd2e4a49b94c00981e4b582d4fddedc23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Thalheim?= Date: Mon, 9 Dec 2024 12:37:14 +0100 Subject: [PATCH 3/3] cleanup_pending_closure: make it a query using a cte instead --- .../functions/2_cleanup_pending_closure.sql | 34 ----------------- .../20241026095416_initial_model.sql | 4 +- server/pg/query.sql | 37 ++++++++++++++++++- server/pg/query.sql.go | 33 ++++++++++++++++- 4 files changed, 70 insertions(+), 38 deletions(-) delete mode 100644 server/pg/functions/2_cleanup_pending_closure.sql diff --git a/server/pg/functions/2_cleanup_pending_closure.sql b/server/pg/functions/2_cleanup_pending_closure.sql deleted file mode 100644 index 1f59748..0000000 --- a/server/pg/functions/2_cleanup_pending_closure.sql +++ /dev/null @@ -1,34 +0,0 @@ --- +goose up - --- +goose statementbegin -CREATE OR REPLACE FUNCTION cleanup_pending_closures( - seconds int -) RETURNS void AS $$ -BEGIN - -- Create a temporary view for old_pending_closures - EXECUTE format(' - CREATE TEMP VIEW temp_old_pending_closures AS - SELECT id - FROM pending_closures - WHERE started_at < NOW() - interval ''1 second'' * %L', seconds); - - -- Insert pending objects into objects table if they don't already exist - INSERT INTO objects (key, deleted_at) - SELECT po.key, NOW() - FROM pending_objects po - JOIN temp_old_pending_closures opc ON po.pending_closure_id = opc.id - ON CONFLICT (key) DO NOTHING; - - -- Delete pending objects whose associated pending closures are older than the specified interval - DELETE FROM pending_objects - WHERE pending_closure_id IN (SELECT id FROM temp_old_pending_closures); - - -- Delete pending closures older than the specified interval - DELETE FROM pending_closures - WHERE id IN (SELECT id FROM temp_old_pending_closures); - - -- Drop the temporary view - DROP VIEW temp_old_pending_closures; -END; -$$ LANGUAGE plpgsql; --- +goose statementend diff --git a/server/pg/migrations/20241026095416_initial_model.sql b/server/pg/migrations/20241026095416_initial_model.sql index f32073e..e734383 100644 --- a/server/pg/migrations/20241026095416_initial_model.sql +++ b/server/pg/migrations/20241026095416_initial_model.sql @@ -53,11 +53,12 @@ CREATE TABLE pending_closures key varchar(1024) NOT NULL, started_at timestamp NOT NULL ); +CREATE INDEX pending_closures_started_at_idx ON pending_closures (started_at); -- This is where track not yet uploaded objects associated with a pending closure CREATE TABLE pending_objects ( - pending_closure_id bigint NOT NULL REFERENCES pending_closures (id), + pending_closure_id bigint NOT NULL REFERENCES pending_closures (id) ON DELETE CASCADE, key varchar(1024) NOT NULL, PRIMARY KEY (key, pending_closure_id) ); @@ -73,6 +74,7 @@ DROP INDEX closure_objects_closure_key_idx; DROP INDEX pending_objects_pending_closure_id_idx; DROP INDEX closure_objects_object_key_idx; DROP INDEX closures_updated_at_idx; +DROP INDEX pending_closures_started_at_idx; DROP TABLE closures; DROP TABLE objects; diff --git a/server/pg/query.sql b/server/pg/query.sql index 0f37083..baf0688 100644 --- a/server/pg/query.sql +++ b/server/pg/query.sql @@ -24,7 +24,42 @@ WHERE key = any($1::varchar []); SELECT commit_pending_closure($1::bigint); -- name: CleanupPendingClosures :exec -SELECT cleanup_pending_closures($1::int); +WITH cutoff_time AS ( + SELECT timezone('UTC', now()) - interval '1 second' * $1 AS time +), + +old_closures AS ( + SELECT id + FROM pending_closures, cutoff_time + WHERE started_at < cutoff_time.time +), + +-- Insert pending objects into objects table if they don't already exist +-- We mark them as deleted so they can be cleaned up later +inserted_objects AS ( + INSERT INTO objects (key, deleted_at) + SELECT + po.key, + cutoff_time.time + FROM pending_objects AS po + JOIN old_closures oc ON po.pending_closure_id = oc.id, cutoff_time + ON CONFLICT (key) DO NOTHING + RETURNING key +), + +-- Delete pending objects that were inserted into the objects table +deleted_pending_objects AS ( + DELETE FROM pending_objects + USING old_closures + WHERE pending_objects.pending_closure_id = old_closures.id + RETURNING pending_closure_id +) + +-- Delete pending closures older than the specified interval +-- This will cascade to pending_objects +DELETE FROM pending_closures +USING old_closures +WHERE pending_closures.id = old_closures.id; -- name: GetClosure :one SELECT updated_at FROM closures WHERE key = $1 LIMIT 1; diff --git a/server/pg/query.sql.go b/server/pg/query.sql.go index fa3d0ee..278273a 100644 --- a/server/pg/query.sql.go +++ b/server/pg/query.sql.go @@ -12,10 +12,39 @@ import ( ) const cleanupPendingClosures = `-- name: CleanupPendingClosures :exec -SELECT cleanup_pending_closures($1::int) +WITH cutoff_time AS ( + SELECT timezone('UTC', NOW()) - interval '1 second' * $1 AS time +), +old_closures AS ( + SELECT id + FROM pending_closures, cutoff_time + WHERE started_at < cutoff_time.time +), +inserted_objects AS ( + INSERT INTO objects (key, deleted_at) + SELECT po.key, cutoff_time.time + FROM pending_objects as po + JOIN old_closures oc ON po.pending_closure_id = oc.id, cutoff_time + ON CONFLICT (key) DO NOTHING + RETURNING key +), +deleted_pending_objects AS ( + DELETE FROM pending_objects + USING old_closures + WHERE pending_objects.pending_closure_id = old_closures.id + RETURNING pending_closure_id +) +DELETE FROM pending_closures +USING old_closures +WHERE pending_closures.id = old_closures.id ` -func (q *Queries) CleanupPendingClosures(ctx context.Context, dollar_1 int32) error { +// Insert pending objects into objects table if they don't already exist +// We mark them as deleted so they can be cleaned up later +// Delete pending objects that were inserted into the objects table +// Delete pending closures older than the specified interval +// This will cascade to pending_objects +func (q *Queries) CleanupPendingClosures(ctx context.Context, dollar_1 interface{}) error { _, err := q.db.Exec(ctx, cleanupPendingClosures, dollar_1) return err }