Skip to content

Commit

Permalink
Update SQL in backfill script for facet tables to improve performance…
Browse files Browse the repository at this point in the history
… on large installations

Signed-off-by: Michael Collado <collado.mike@gmail.com>
  • Loading branch information
collado-mike committed Mar 29, 2023
1 parent 60581e3 commit 514c2d5
Showing 1 changed file with 57 additions and 92 deletions.
149 changes: 57 additions & 92 deletions api/src/main/java/marquez/db/migrations/V57_1__BackfillFacets.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,72 +23,57 @@ public class V57_1__BackfillFacets implements JavaMigration {

private static int BASIC_MIGRATION_LIMIT = 100000;

private static final String GET_CURRENT_LOCK_SQL =
private static final String CREATE_TEMP_EVENT_RUNS_TABLE =
"""
SELECT * FROM facet_migration_lock
ORDER BY created_at ASC, run_uuid ASC
LIMIT 1
""";

private static final String GET_FINISHING_LOCK_SQL =
"""
SELECT run_uuid, created_at FROM lineage_events
ORDER BY
COALESCE(created_at, event_time) ASC,
run_uuid ASC
LIMIT 1
""";
CREATE TEMP TABLE lineage_event_runs AS
SELECT DISTINCT ON (run_uuid) run_uuid,
COALESCE(created_at, event_time) AS created_at
FROM lineage_events
""";

private static final String GET_INITIAL_LOCK_SQL =
private static final String CREATE_INDEX_EVENT_RUNS_TABLE =
"""
SELECT
run_uuid,
COALESCE(created_at, event_time, NOW()) + INTERVAL '1 MILLISECONDS' as created_at
FROM lineage_events ORDER BY COALESCE(created_at, event_time) DESC, run_uuid DESC LIMIT 1
""";
CREATE INDEX ON lineage_event_runs (created_at DESC) INCLUDE (run_uuid)
""";

private static final String COUNT_LINEAGE_EVENTS_SQL =
"""
SELECT count(*) as cnt FROM lineage_events
""";

private static final String COUNT_LINEAGE_EVENTS_TO_PROCESS_SQL =
"""
SELECT count(*) as cnt FROM lineage_events e
WHERE
COALESCE(e.created_at, e.event_time) < :createdAt
OR (COALESCE(e.created_at, e.event_time) = :createdAt AND e.run_uuid < :runUuid)
SELECT reltuples AS cnt FROM pg_class WHERE relname = 'lineage_events';
""";

private String getBackFillFacetsSQL() {
return String.format(
"""
WITH events_chunk AS (
SELECT e.* FROM lineage_events e
WHERE
COALESCE(e.created_at, e.event_time) < :createdAt
OR (COALESCE(e.created_at, e.event_time) = :createdAt AND e.run_uuid < :runUuid)
ORDER BY COALESCE(e.created_at, e.event_time) DESC, e.run_uuid DESC
LIMIT :chunkSize
),
insert_datasets AS (
INSERT INTO dataset_facets %s
),
insert_runs AS (
INSERT INTO run_facets %s
),
insert_jobs AS (
INSERT INTO job_facets %s
)
INSERT INTO facet_migration_lock
SELECT events_chunk.created_at, events_chunk.run_uuid
FROM events_chunk
ORDER BY
COALESCE(events_chunk.created_at, events_chunk.event_time) ASC,
events_chunk.run_uuid ASC
LIMIT 1
RETURNING created_at, run_uuid;
""",
WITH queued_runs AS (
SELECT created_at, run_uuid
FROM lineage_event_runs
ORDER BY created_at DESC, run_uuid
LIMIT :chunkSize
),
processed_runs AS (
DELETE FROM lineage_event_runs
USING queued_runs qe
WHERE lineage_event_runs.run_uuid=qe.run_uuid
RETURNING lineage_event_runs.run_uuid
),
events_chunk AS (
SELECT e.*
FROM lineage_events e
WHERE run_uuid IN (SELECT run_uuid FROM processed_runs)
),
insert_datasets AS (
INSERT INTO dataset_facets %s
),
insert_runs AS (
INSERT INTO run_facets %s
),
insert_jobs AS (
INSERT INTO job_facets %s
)
INSERT INTO facet_migration_lock
SELECT events_chunk.created_at, events_chunk.run_uuid
FROM events_chunk
""",
V56_1__FacetViews.getDatasetFacetsDefinitionSQL("events_chunk"),
V56_1__FacetViews.getRunFacetsDefinitionSQL("events_chunk"),
V56_1__FacetViews.getJobFacetsDefinitionSQL("events_chunk"));
Expand Down Expand Up @@ -140,17 +125,17 @@ public void migrate(Context context) throws Exception {
jdbi = Jdbi.create(context.getConnection());
}

if (getLock(GET_INITIAL_LOCK_SQL).isEmpty()) {
int estimatedEventsCount = countLineageEvents();
log.info("Estimating {} events in lineage_events table", estimatedEventsCount);
if (estimatedEventsCount == 0) {
// lineage_events table is empty -> no need to run migration
// anyway. we need to create lock to mark that no data requires migration
execute("INSERT INTO facet_migration_lock VALUES (NOW(), null)");

createTargetViews();
return;
}
Optional<MigrationLock> lastExpectedLock = getLock(GET_FINISHING_LOCK_SQL);

if (!manual && countLineageEvents() >= BASIC_MIGRATION_LIMIT) {
if (!manual && estimatedEventsCount >= BASIC_MIGRATION_LIMIT) {
log.warn(
"""
==================================================
Expand All @@ -168,13 +153,17 @@ public void migrate(Context context) throws Exception {
return;
}

jdbi.withHandle(h -> h.createUpdate(CREATE_TEMP_EVENT_RUNS_TABLE).execute());
jdbi.withHandle(h -> h.createUpdate(CREATE_INDEX_EVENT_RUNS_TABLE).execute());

log.info("Configured chunkSize is {}", getChunkSize());
MigrationLock lock = getLock(GET_CURRENT_LOCK_SQL).orElse(getLock(GET_INITIAL_LOCK_SQL).get());
while (!lock.equals(lastExpectedLock.get())) {
lock = backFillChunk(lock);
log.info(
"Migrating chunk finished. Still having {} records to migrate.",
countLineageEventsToProcess(lock));
boolean doMigration = true;
while (doMigration) {
int results = backFillChunk();
log.info("Migrating chunk finished processing {} records.", results);
if (results < 1) {
doMigration = false;
}
}

createTargetViews();
Expand All @@ -195,20 +184,10 @@ private void execute(String sql) {
jdbi.inTransaction(handle -> handle.execute(sql));
}

private MigrationLock backFillChunk(MigrationLock lock) {
private int backFillChunk() {
String backFillQuery = getBackFillFacetsSQL();
return jdbi.withHandle(
h ->
h.createQuery(backFillQuery)
.bind("chunkSize", getChunkSize())
.bind("createdAt", lock.created_at)
.bind("runUuid", lock.run_uuid)
.map(
rs ->
new MigrationLock(
rs.getColumn(Columns.RUN_UUID, UUID.class),
rs.getColumn(Columns.CREATED_AT, Instant.class)))
.one());
h -> h.createUpdate(backFillQuery).bind("chunkSize", getChunkSize()).execute());
}

private Optional<MigrationLock> getLock(String sql) {
Expand All @@ -224,21 +203,7 @@ private Optional<MigrationLock> getLock(String sql) {
}

private int countLineageEvents() {
return jdbi.withHandle(
h ->
h.createQuery(COUNT_LINEAGE_EVENTS_SQL)
.map(rs -> rs.getColumn("cnt", Integer.class))
.one());
}

private int countLineageEventsToProcess(MigrationLock lock) {
return jdbi.withHandle(
h ->
h.createQuery(COUNT_LINEAGE_EVENTS_TO_PROCESS_SQL)
.bind("createdAt", lock.created_at)
.bind("runUuid", lock.run_uuid)
.map(rs -> rs.getColumn("cnt", Integer.class))
.one());
return jdbi.withHandle(h -> h.createQuery(COUNT_LINEAGE_EVENTS_SQL).mapTo(Integer.class).one());
}

private record MigrationLock(UUID run_uuid, Instant created_at) {}
Expand Down

0 comments on commit 514c2d5

Please sign in to comment.