Skip to content

Commit

Permalink
Update SQL in backfill script for facet tables to improve performance…
Browse files Browse the repository at this point in the history
… on large installations (MarquezProject#2461)

Signed-off-by: Michael Collado <collado.mike@gmail.com>
Signed-off-by: Xavier-Cliquennois <xavier.cliquennois@wearegraphite.io>
  • Loading branch information
collado-mike authored and Xavier-Cliquennois committed Jul 26, 2023
1 parent 4410d63 commit b6811b1
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 146 deletions.
176 changes: 68 additions & 108 deletions api/src/main/java/marquez/db/migrations/V57_1__BackfillFacets.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@

package marquez.db.migrations;

import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import marquez.db.Columns;
import org.flywaydb.core.api.MigrationVersion;
import org.flywaydb.core.api.migration.Context;
import org.flywaydb.core.api.migration.JavaMigration;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.Jdbi;

@Slf4j
Expand All @@ -23,72 +20,62 @@ public class V57_1__BackfillFacets implements JavaMigration {

private static int BASIC_MIGRATION_LIMIT = 100000;

private static final String GET_CURRENT_LOCK_SQL =
private static final String CREATE_TEMP_EVENT_RUNS_TABLE =
"""
SELECT * FROM facet_migration_lock
ORDER BY created_at ASC, run_uuid ASC
LIMIT 1
""";
CREATE TEMP TABLE lineage_event_runs AS
SELECT DISTINCT ON (run_uuid) run_uuid,
COALESCE(created_at, event_time) AS created_at
FROM lineage_events
""";

private static final String GET_FINISHING_LOCK_SQL =
private static final String CREATE_INDEX_EVENT_RUNS_TABLE =
"""
SELECT run_uuid, created_at FROM lineage_events
ORDER BY
COALESCE(created_at, event_time) ASC,
run_uuid ASC
LIMIT 1
""";

private static final String GET_INITIAL_LOCK_SQL =
"""
SELECT
run_uuid,
COALESCE(created_at, event_time, NOW()) + INTERVAL '1 MILLISECONDS' as created_at
FROM lineage_events ORDER BY COALESCE(created_at, event_time) DESC, run_uuid DESC LIMIT 1
""";
CREATE INDEX ON lineage_event_runs (created_at DESC) INCLUDE (run_uuid)
""";

private static final String COUNT_LINEAGE_EVENTS_SQL =
"""
SELECT count(*) as cnt FROM lineage_events
""";
SELECT COUNT(*) FROM lineage_events;
""";

private static final String COUNT_LINEAGE_EVENTS_TO_PROCESS_SQL =
private static final String ESTIMATE_COUNT_LINEAGE_EVENTS_SQL =
"""
SELECT count(*) as cnt FROM lineage_events e
WHERE
COALESCE(e.created_at, e.event_time) < :createdAt
OR (COALESCE(e.created_at, e.event_time) = :createdAt AND e.run_uuid < :runUuid)
SELECT reltuples AS cnt FROM pg_class WHERE relname = 'lineage_events';
""";

private String getBackFillFacetsSQL() {
return String.format(
"""
WITH events_chunk AS (
SELECT e.* FROM lineage_events e
WHERE
COALESCE(e.created_at, e.event_time) < :createdAt
OR (COALESCE(e.created_at, e.event_time) = :createdAt AND e.run_uuid < :runUuid)
ORDER BY COALESCE(e.created_at, e.event_time) DESC, e.run_uuid DESC
LIMIT :chunkSize
),
insert_datasets AS (
INSERT INTO dataset_facets %s
),
insert_runs AS (
INSERT INTO run_facets %s
),
insert_jobs AS (
INSERT INTO job_facets %s
)
INSERT INTO facet_migration_lock
SELECT events_chunk.created_at, events_chunk.run_uuid
FROM events_chunk
ORDER BY
COALESCE(events_chunk.created_at, events_chunk.event_time) ASC,
events_chunk.run_uuid ASC
LIMIT 1
RETURNING created_at, run_uuid;
""",
WITH queued_runs AS (
SELECT created_at, run_uuid
FROM lineage_event_runs
ORDER BY created_at DESC, run_uuid
LIMIT :chunkSize
),
processed_runs AS (
DELETE FROM lineage_event_runs
USING queued_runs qe
WHERE lineage_event_runs.run_uuid=qe.run_uuid
RETURNING lineage_event_runs.run_uuid
),
events_chunk AS (
SELECT e.*
FROM lineage_events e
WHERE run_uuid IN (SELECT run_uuid FROM processed_runs)
),
insert_datasets AS (
INSERT INTO dataset_facets %s
),
insert_runs AS (
INSERT INTO run_facets %s
),
insert_jobs AS (
INSERT INTO job_facets %s
)
INSERT INTO facet_migration_lock
SELECT events_chunk.created_at, events_chunk.run_uuid
FROM events_chunk
""",
V56_1__FacetViews.getDatasetFacetsDefinitionSQL("events_chunk"),
V56_1__FacetViews.getRunFacetsDefinitionSQL("events_chunk"),
V56_1__FacetViews.getJobFacetsDefinitionSQL("events_chunk"));
Expand Down Expand Up @@ -140,17 +127,17 @@ public void migrate(Context context) throws Exception {
jdbi = Jdbi.create(context.getConnection());
}

if (getLock(GET_INITIAL_LOCK_SQL).isEmpty()) {
int estimatedEventsCount = estimateCountLineageEvents();
log.info("Estimating {} events in lineage_events table", estimatedEventsCount);
if (estimatedEventsCount == 0 && countLineageEvents() == 0) {
// lineage_events table is empty -> no need to run migration
// anyway. we need to create lock to mark that no data requires migration
execute("INSERT INTO facet_migration_lock VALUES (NOW(), null)");

createTargetViews();
return;
}
Optional<MigrationLock> lastExpectedLock = getLock(GET_FINISHING_LOCK_SQL);

if (!manual && countLineageEvents() >= BASIC_MIGRATION_LIMIT) {
if (!manual && estimatedEventsCount >= BASIC_MIGRATION_LIMIT) {
log.warn(
"""
==================================================
Expand All @@ -168,14 +155,21 @@ public void migrate(Context context) throws Exception {
return;
}

log.info("Configured chunkSize is {}", getChunkSize());
MigrationLock lock = getLock(GET_CURRENT_LOCK_SQL).orElse(getLock(GET_INITIAL_LOCK_SQL).get());
while (!lock.equals(lastExpectedLock.get())) {
lock = backFillChunk(lock);
log.info(
"Migrating chunk finished. Still having {} records to migrate.",
countLineageEventsToProcess(lock));
}
jdbi.withHandle(
h -> {
h.createUpdate(CREATE_TEMP_EVENT_RUNS_TABLE).execute();
h.createUpdate(CREATE_INDEX_EVENT_RUNS_TABLE).execute();
log.info("Configured chunkSize is {}", getChunkSize());
boolean doMigration = true;
while (doMigration) {
int results = backFillChunk(h);
log.info("Migrating chunk finished processing {} records.", results);
if (results < 1) {
doMigration = false;
}
}
return null;
});

createTargetViews();
log.info("All records migrated");
Expand All @@ -195,51 +189,17 @@ private void execute(String sql) {
jdbi.inTransaction(handle -> handle.execute(sql));
}

private MigrationLock backFillChunk(MigrationLock lock) {
private int backFillChunk(Handle h) {
String backFillQuery = getBackFillFacetsSQL();
return jdbi.withHandle(
h ->
h.createQuery(backFillQuery)
.bind("chunkSize", getChunkSize())
.bind("createdAt", lock.created_at)
.bind("runUuid", lock.run_uuid)
.map(
rs ->
new MigrationLock(
rs.getColumn(Columns.RUN_UUID, UUID.class),
rs.getColumn(Columns.CREATED_AT, Instant.class)))
.one());
return h.createUpdate(backFillQuery).bind("chunkSize", getChunkSize()).execute();
}

private Optional<MigrationLock> getLock(String sql) {
private int estimateCountLineageEvents() {
return jdbi.withHandle(
h ->
h.createQuery(sql)
.map(
rs ->
new MigrationLock(
rs.getColumn(Columns.RUN_UUID, UUID.class),
rs.getColumn(Columns.CREATED_AT, Instant.class)))
.findFirst());
h -> h.createQuery(ESTIMATE_COUNT_LINEAGE_EVENTS_SQL).mapTo(Integer.class).one());
}

private int countLineageEvents() {
return jdbi.withHandle(
h ->
h.createQuery(COUNT_LINEAGE_EVENTS_SQL)
.map(rs -> rs.getColumn("cnt", Integer.class))
.one());
return jdbi.withHandle(h -> h.createQuery(COUNT_LINEAGE_EVENTS_SQL).mapTo(Integer.class).one());
}

private int countLineageEventsToProcess(MigrationLock lock) {
return jdbi.withHandle(
h ->
h.createQuery(COUNT_LINEAGE_EVENTS_TO_PROCESS_SQL)
.bind("createdAt", lock.created_at)
.bind("runUuid", lock.run_uuid)
.map(rs -> rs.getColumn("cnt", Integer.class))
.one());
}

private record MigrationLock(UUID run_uuid, Instant created_at) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,44 +165,6 @@ public void testMigrateForMultipleChunks() throws Exception {
}
}

@Test
public void testWhenCurrentLockIsAvailable() throws Exception {
FacetTestUtils.createLineageWithFacets(openLineageDao);
FacetTestUtils.createLineageWithFacets(openLineageDao);
lineageRow =
FacetTestUtils.createLineageWithFacets(
openLineageDao); // point migration_lock to only match the latest lineage event

jdbi.withHandle(
h ->
h.execute(
"""
INSERT INTO facet_migration_lock
SELECT created_at, run_uuid FROM lineage_events
ORDER by created_at DESC LIMIT 1
""")); // last lineage row should be skipped

jdbi.withHandle(
h ->
h.execute(
"""
INSERT INTO facet_migration_lock
SELECT created_at, run_uuid FROM lineage_events
ORDER by created_at DESC LIMIT 1 OFFSET 1
""")); // middle lineage row should be skipped

try (MockedStatic<Jdbi> jdbiMockedStatic = Mockito.mockStatic(Jdbi.class)) {
when(Jdbi.create(connection)).thenReturn(jdbi);
subject.setChunkSize(1);

// clear migration lock and dataset_facets table
jdbi.inTransaction(handle -> handle.execute("DELETE FROM dataset_facets"));
subject.migrate(flywayContext);

assertThat(countDatasetFacets(jdbi)).isEqualTo(15);
}
}

@Test
public void testMigrateForLineageWithNoDatasets() throws Exception {
LineageEvent.JobFacet jobFacet =
Expand Down

0 comments on commit b6811b1

Please sign in to comment.