From a515c91607163cd9180dfd660f6d8dca7ca70dc4 Mon Sep 17 00:00:00 2001 From: Willy Lulciuc Date: Tue, 18 Jul 2023 21:36:19 +0200 Subject: [PATCH] Add db retention support (#2486) * Add db migration to add cascade deletion on `fk`s Signed-off-by: wslulciuc * Add `DbDataRetention` and `dataRetentionInDays` config Signed-off-by: wslulciuc * Add `DbRetentionJob` Signed-off-by: wslulciuc * Add `DbRetentionCommand` Signed-off-by: wslulciuc * Add `frequencyMins` config for runs and rename `dbRetentionInDays` Signed-off-by: wslulciuc * Add docs to `DbRetentionJob` and minor renaming Signed-off-by: wslulciuc * Wrap `DbRetention.retentionOnDbOrError()` in `try/catch` Signed-off-by: wslulciuc * Add docs to DbRetention Signed-off-by: wslulciuc * continued: Add docs to `DbRetention` Signed-off-by: wslulciuc * Add handling of `errorOnDbRetention` Signed-off-by: wslulciuc * Add docs to `DbException` and `DbRetentionException` Signed-off-by: wslulciuc * `info` -> `debug` when inserting column lineage Signed-off-by: wslulciuc * Remove `dbRetention.enabled` Signed-off-by: wslulciuc * Update handling of `StatementException` Signed-off-by: wslulciuc * Minor changes Signed-off-by: wslulciuc * Add `docs/faq.md` Signed-off-by: wslulciuc * continued: `Add docs/faq.md` Signed-off-by: wslulciuc * continued: Add `docs/faq.md` Signed-off-by: wslulciuc * continued: Add `docs/faq.md` Signed-off-by: wslulciuc * Define `DEFAULT_RETENTION_DAYS` constant in `DbRetention` Signed-off-by: wslulciuc * Make chunk size in retention query configurable Signed-off-by: wslulciuc * Remove `DATA_RETENTION_IN_DAYS` from `MarquezConfig` Signed-off-by: wslulciuc * Update docs for chunk size config Signed-off-by: wslulciuc * Remove error log from `DbRetention.retentionOnDbOrError()` Signed-off-by: wslulciuc * Use `LOOP` for retention Signed-off-by: wslulciuc * continued: Use `LOOP` for retention Signed-off-by: wslulciuc * Use `numberOfRowsPerBatch` Signed-off-by: wslulciuc * Use `--number-of-rows-per-batch` Signed-off-by: wslulciuc * Add pause to prevent lock timeouts Signed-off-by: wslulciuc * Add `FOR UPDATE SKIP LOCKED` Signed-off-by: wslulciuc * Add `sql()` Signed-off-by: wslulciuc * Add `--dry-run` Signed-off-by: wslulciuc * Add `jdbi3-testcontainers` Signed-off-by: wslulciuc * Remove shortened flag args Signed-off-by: wslulciuc * Use `marquez.db.DbRetention.DEFAULT_DRY_RUN` Signed-off-by: wslulciuc * Add DbRetention.retentionOnRuns() Signed-off-by: wslulciuc * Add `DbMigration.migrateDbOrError(DataSource)` Signed-off-by: wslulciuc * Add `TestingDb` Signed-off-by: wslulciuc * Add `DbTest` Signed-off-by: wslulciuc * Add `testRetentionOnDbOrError_withDatasetsOlderThanXDays()` Signed-off-by: wslulciuc * Remove `jobs.DbRetentionConfig.dryRun` Signed-off-by: wslulciuc * Add `--dry-run` option to `faq.md` Signed-off-by: wslulciuc * continued: Add --dry-run option to faq.md Signed-off-by: wslulciuc * continued: `Add testRetentionOnDbOrError_withDatasetsOlderThanXDays` Signed-off-by: wslulciuc * Fix retention query for datasets and dataset versions Signed-off-by: wslulciuc * Add test for retention on dataset versions Signed-off-by: wslulciuc * Add comments to tests Signed-off-by: wslulciuc * Add `testRetentionOnDbOrErrorWithDatasetVersionsOlderThanXDays_skipIfVersionAsInputForRun()` Signed-off-by: wslulciuc * Add `testRetentionOnDbOrErrorWithJobsOlderThanXDays()` Signed-off-by: wslulciuc * Add `testRetentionOnDbOrErrorWithJobVersionsOlderThanXDays()` Signed-off-by: wslulciuc * Add tests for dry run Signed-off-by: wslulciuc * Add testRetentionOnDbOrErrorWithRunsOlderThanXDays() Signed-off-by: wslulciuc * Add `testRetentionOnDbOrErrorWithOlEventsOlderThanXDays()` Signed-off-by: wslulciuc * continued: `Add testRetentionOnDbOrErrorWithOlEventsOlderThanXDays()` Signed-off-by: wslulciuc * Add `javadocs` to `DbRetention` Signed-off-by: wslulciuc * Run tests in order of retention Signed-off-by: wslulciuc --------- Signed-off-by: wslulciuc Co-authored-by: Harel Shein --- api/build.gradle | 1 + api/src/main/java/marquez/MarquezApp.java | 63 +- api/src/main/java/marquez/MarquezConfig.java | 12 + .../java/marquez/cli/DbRetentionCommand.java | 114 +++ api/src/main/java/marquez/db/Columns.java | 1 + api/src/main/java/marquez/db/DbMigration.java | 10 +- api/src/main/java/marquez/db/DbRetention.java | 703 ++++++++++++++++++ .../main/java/marquez/db/OpenLineageDao.java | 10 +- api/src/main/java/marquez/db/RunDao.java | 1 - .../marquez/db/exceptions/DbException.java | 28 + .../db/exceptions/DbRetentionException.java | 30 + .../marquez/db/mappers/DatasetRowMapper.java | 2 + .../db/mappers/DatasetVersionRowMapper.java | 4 +- .../java/marquez/db/mappers/JobRowMapper.java | 1 + .../java/marquez/db/mappers/RunRowMapper.java | 4 +- .../java/marquez/db/models/DatasetRow.java | 10 + .../marquez/db/models/DatasetVersionRow.java | 2 + .../db/models/ExtendedDatasetVersionRow.java | 4 +- .../marquez/db/models/ExtendedRunRow.java | 4 +- .../marquez/db/models/InputFieldData.java | 2 + .../main/java/marquez/db/models/JobRow.java | 1 + .../main/java/marquez/db/models/RunRow.java | 2 + .../java/marquez/jobs/DbRetentionConfig.java | 21 + .../java/marquez/jobs/DbRetentionJob.java | 93 +++ ...63__alter_tables_add_on_cascade_delete.sql | 272 +++++++ .../java/marquez/api/models/ActiveRun.java | 111 +++ .../marquez/api/models/ApiModelGenerator.java | 267 +++++++ .../common/models/CommonModelGenerator.java | 4 + .../java/marquez/db/BackfillTestUtils.java | 1 - .../test/java/marquez/db/DbRetentionTest.java | 687 +++++++++++++++++ api/src/test/java/marquez/db/DbTest.java | 63 ++ api/src/test/java/marquez/db/DbTestUtils.java | 89 ++- api/src/test/java/marquez/db/RunDaoTest.java | 3 - api/src/test/java/marquez/db/TestingDb.java | 246 ++++++ .../marquez/db/models/DbModelGenerator.java | 302 +++++++- docs/faq.md | 41 + marquez.example.yml | 9 + 37 files changed, 3173 insertions(+), 45 deletions(-) create mode 100644 api/src/main/java/marquez/cli/DbRetentionCommand.java create mode 100644 api/src/main/java/marquez/db/DbRetention.java create mode 100644 api/src/main/java/marquez/db/exceptions/DbException.java create mode 100644 api/src/main/java/marquez/db/exceptions/DbRetentionException.java create mode 100644 api/src/main/java/marquez/jobs/DbRetentionConfig.java create mode 100644 api/src/main/java/marquez/jobs/DbRetentionJob.java create mode 100644 api/src/main/resources/marquez/db/migration/V63__alter_tables_add_on_cascade_delete.sql create mode 100644 api/src/test/java/marquez/api/models/ActiveRun.java create mode 100644 api/src/test/java/marquez/api/models/ApiModelGenerator.java create mode 100644 api/src/test/java/marquez/db/DbRetentionTest.java create mode 100644 api/src/test/java/marquez/db/DbTest.java create mode 100644 api/src/test/java/marquez/db/TestingDb.java create mode 100644 docs/faq.md diff --git a/api/build.gradle b/api/build.gradle index 8a12155ea5..aea3a38c25 100644 --- a/api/build.gradle +++ b/api/build.gradle @@ -53,6 +53,7 @@ dependencies { testImplementation "io.dropwizard:dropwizard-testing:${dropwizardVersion}" testImplementation "org.jdbi:jdbi3-testing:${jdbi3Version}" + testImplementation "org.jdbi:jdbi3-testcontainers:${jdbi3Version}" testImplementation "org.junit.vintage:junit-vintage-engine:${junit5Version}" testImplementation "org.testcontainers:postgresql:${testcontainersVersion}" testImplementation "org.testcontainers:junit-jupiter:${testcontainersVersion}" diff --git a/api/src/main/java/marquez/MarquezApp.java b/api/src/main/java/marquez/MarquezApp.java index feac31f882..7d75b1b248 100644 --- a/api/src/main/java/marquez/MarquezApp.java +++ b/api/src/main/java/marquez/MarquezApp.java @@ -23,15 +23,16 @@ import io.sentry.Sentry; import java.util.EnumSet; import javax.servlet.DispatcherType; -import javax.sql.DataSource; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import marquez.api.filter.JobRedirectFilter; import marquez.cli.DbMigrationCommand; +import marquez.cli.DbRetentionCommand; import marquez.cli.MetadataCommand; import marquez.cli.SeedCommand; import marquez.common.Utils; import marquez.db.DbMigration; +import marquez.jobs.DbRetentionJob; import marquez.logging.LoggingMdcFilter; import marquez.tracing.SentryConfig; import marquez.tracing.TracingContainerResponseFilter; @@ -79,6 +80,7 @@ public void initialize(@NonNull Bootstrap bootstrap) { new EnvironmentVariableSubstitutor(ERROR_ON_UNDEFINED))); // Add CLI commands + bootstrap.addCommand(new DbRetentionCommand()); bootstrap.addCommand(new MetadataCommand()); bootstrap.addCommand(new SeedCommand()); @@ -97,7 +99,7 @@ public void initialize(@NonNull Bootstrap bootstrap) { @Override public void run(@NonNull MarquezConfig config, @NonNull Environment env) { final DataSourceFactory sourceFactory = config.getDataSourceFactory(); - final DataSource source = sourceFactory.build(env.metrics(), DB_SOURCE_NAME); + final ManagedDataSource source = sourceFactory.build(env.metrics(), DB_SOURCE_NAME); log.info("Running startup actions..."); @@ -124,10 +126,25 @@ public void run(@NonNull MarquezConfig config, @NonNull Environment env) { env.jersey().register(new TracingContainerResponseFilter()); } - MarquezContext marquezContext = buildMarquezContext(config, env, (ManagedDataSource) source); + final Jdbi jdbi = newJdbi(config, env, source); + final MarquezContext marquezContext = + MarquezContext.builder().jdbi(jdbi).tags(config.getTags()).build(); + registerResources(config, env, marquezContext); registerServlets(env); registerFilters(env, marquezContext); + + // Add scheduled jobs to lifecycle. + if (config.hasDbRetentionPolicy()) { + // Add job to apply retention policy to database. + env.lifecycle() + .manage( + new DbRetentionJob( + jdbi, + config.getDbRetention().getFrequencyMins(), + config.getDbRetention().getNumberOfRowsPerBatch(), + config.getDbRetention().getRetentionDays())); + } } private boolean isSentryEnabled(MarquezConfig config) { @@ -135,6 +152,25 @@ private boolean isSentryEnabled(MarquezConfig config) { && !config.getSentry().getDsn().equals(SentryConfig.DEFAULT_DSN); } + /** Returns a new {@link Jdbi} object. */ + private Jdbi newJdbi( + @NonNull MarquezConfig config, @NonNull Environment env, @NonNull ManagedDataSource source) { + final JdbiFactory factory = new JdbiFactory(); + final Jdbi jdbi = + factory + .build(env, config.getDataSourceFactory(), source, DB_POSTGRES) + .installPlugin(new SqlObjectPlugin()) + .installPlugin(new PostgresPlugin()) + .installPlugin(new Jackson2Plugin()); + SqlLogger sqlLogger = new InstrumentedSqlLogger(env.metrics()); + if (isSentryEnabled(config)) { + sqlLogger = new TracingSQLLogger(sqlLogger); + } + jdbi.setSqlLogger(sqlLogger); + jdbi.getConfig(Jackson2Config.class).setMapper(Utils.getMapper()); + return jdbi; + } + public void registerResources( @NonNull MarquezConfig config, @NonNull Environment env, MarquezContext context) { @@ -156,27 +192,6 @@ protected void addDefaultCommands(Bootstrap bootstrap) { super.addDefaultCommands(bootstrap); } - private MarquezContext buildMarquezContext( - MarquezConfig config, Environment env, ManagedDataSource source) { - final JdbiFactory factory = new JdbiFactory(); - final Jdbi jdbi = - factory - .build(env, config.getDataSourceFactory(), source, DB_POSTGRES) - .installPlugin(new SqlObjectPlugin()) - .installPlugin(new PostgresPlugin()) - .installPlugin(new Jackson2Plugin()); - SqlLogger sqlLogger = new InstrumentedSqlLogger(env.metrics()); - if (isSentryEnabled(config)) { - sqlLogger = new TracingSQLLogger(sqlLogger); - } - jdbi.setSqlLogger(sqlLogger); - jdbi.getConfig(Jackson2Config.class).setMapper(Utils.getMapper()); - - final MarquezContext context = - MarquezContext.builder().jdbi(jdbi).tags(config.getTags()).build(); - return context; - } - private void registerServlets(@NonNull Environment env) { log.debug("Registering servlets..."); diff --git a/api/src/main/java/marquez/MarquezConfig.java b/api/src/main/java/marquez/MarquezConfig.java index 6c7081d752..97263a64be 100644 --- a/api/src/main/java/marquez/MarquezConfig.java +++ b/api/src/main/java/marquez/MarquezConfig.java @@ -11,8 +11,10 @@ import io.dropwizard.db.DataSourceFactory; import lombok.Getter; import lombok.NoArgsConstructor; +import lombok.Setter; import marquez.db.FlywayFactory; import marquez.graphql.GraphqlConfig; +import marquez.jobs.DbRetentionConfig; import marquez.service.models.Tag; import marquez.tracing.SentryConfig; @@ -40,4 +42,14 @@ public class MarquezConfig extends Configuration { @Getter @JsonProperty("sentry") private final SentryConfig sentry = new SentryConfig(); + + @Getter + @Setter + @JsonProperty("dbRetention") + private DbRetentionConfig dbRetention; // OPTIONAL + + /** Returns {@code true} if a data retention policy has been configured. */ + public boolean hasDbRetentionPolicy() { + return (dbRetention != null); + } } diff --git a/api/src/main/java/marquez/cli/DbRetentionCommand.java b/api/src/main/java/marquez/cli/DbRetentionCommand.java new file mode 100644 index 0000000000..bb3cda1132 --- /dev/null +++ b/api/src/main/java/marquez/cli/DbRetentionCommand.java @@ -0,0 +1,114 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.cli; + +import static marquez.db.DbRetention.DEFAULT_DRY_RUN; +import static marquez.db.DbRetention.DEFAULT_NUMBER_OF_ROWS_PER_BATCH; +import static marquez.db.DbRetention.DEFAULT_RETENTION_DAYS; + +import io.dropwizard.cli.ConfiguredCommand; +import io.dropwizard.db.DataSourceFactory; +import io.dropwizard.db.ManagedDataSource; +import io.dropwizard.setup.Bootstrap; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import marquez.MarquezConfig; +import marquez.db.DbRetention; +import marquez.db.exceptions.DbRetentionException; +import net.sourceforge.argparse4j.impl.Arguments; +import net.sourceforge.argparse4j.inf.Namespace; +import org.jdbi.v3.core.Jdbi; +import org.jdbi.v3.postgres.PostgresPlugin; + +/** + * A command to apply a one-off ad-hoc retention policy directly to source, dataset, and job + * metadata collected by Marquez. + * + *

Usage

+ * + * For example, to override the {@code retention-days}: + * + *
{@code
+ * java -jar marquez-api.jar db-retention --retention-days 30 marquez.yml
+ * }
+ */ +@Slf4j +public class DbRetentionCommand extends ConfiguredCommand { + private static final String DB_SOURCE_NAME = "ad-hoc-db-retention-source"; + + /* Args for 'db-retention' command. */ + private static final String CMD_ARG_NUMBER_OF_ROWS_PER_BATCH = "numberOfRowsPerBatch"; + private static final String CMD_ARG_RETENTION_DAYS = "retentionDays"; + private static final String CMD_ARG_DRY_RUN = "dryRun"; + + /* Define 'db-retention' command. */ + public DbRetentionCommand() { + super("db-retention", "apply one-off ad-hoc retention policy directly to database"); + } + + @Override + public void configure(@NonNull net.sourceforge.argparse4j.inf.Subparser subparser) { + super.configure(subparser); + // Arg '--number-of-rows-per-batch' + subparser + .addArgument("--number-of-rows-per-batch") + .dest(CMD_ARG_NUMBER_OF_ROWS_PER_BATCH) + .type(Integer.class) + .required(false) + .setDefault(DEFAULT_NUMBER_OF_ROWS_PER_BATCH) + .help("the number of rows deleted per batch"); + // Arg '--retention-days' + subparser + .addArgument("--retention-days") + .dest(CMD_ARG_RETENTION_DAYS) + .type(Integer.class) + .required(false) + .setDefault(DEFAULT_RETENTION_DAYS) + .help("the number of days to retain metadata"); + // Arg '--dry-run' + subparser + .addArgument("--dry-run") + .dest(CMD_ARG_DRY_RUN) + .type(Boolean.class) + .required(false) + .setDefault(DEFAULT_DRY_RUN) + .action(Arguments.storeTrue()) + .help( + "only output an estimate of metadata deleted by the retention policy, " + + "without applying the policy on database"); + } + + @Override + protected void run( + @NonNull Bootstrap bootstrap, + @NonNull Namespace namespace, + @NonNull MarquezConfig config) + throws Exception { + final int numberOfRowsPerBatch = namespace.getInt(CMD_ARG_NUMBER_OF_ROWS_PER_BATCH); + final int retentionDays = namespace.getInt(CMD_ARG_RETENTION_DAYS); + final boolean dryRun = namespace.getBoolean(CMD_ARG_DRY_RUN); + + // Configure connection. + final DataSourceFactory sourceFactory = config.getDataSourceFactory(); + final ManagedDataSource source = + sourceFactory.build(bootstrap.getMetricRegistry(), DB_SOURCE_NAME); + + // Open connection. + final Jdbi jdbi = Jdbi.create(source); + jdbi.installPlugin(new PostgresPlugin()); // Add postgres support. + + try { + // Attempt to apply a database retention policy. An exception is thrown on failed retention + // policy attempts requiring we handle the throwable and log the error. + DbRetention.retentionOnDbOrError(jdbi, numberOfRowsPerBatch, retentionDays, dryRun); + } catch (DbRetentionException errorOnDbRetention) { + log.error( + "Failed to apply retention policy of '{}' days to database!", + retentionDays, + errorOnDbRetention); + } + } +} diff --git a/api/src/main/java/marquez/db/Columns.java b/api/src/main/java/marquez/db/Columns.java index 8abc5ab162..dfdf67492a 100644 --- a/api/src/main/java/marquez/db/Columns.java +++ b/api/src/main/java/marquez/db/Columns.java @@ -140,6 +140,7 @@ private Columns() {} /* LINEAGE EVENT ROW COLUMNS */ public static final String EVENT = "event"; + public static final String EVENT_TIME = "event_time"; public static UUID uuidOrNull(final ResultSet results, final String column) throws SQLException { if (results.getObject(column) == null) { diff --git a/api/src/main/java/marquez/db/DbMigration.java b/api/src/main/java/marquez/db/DbMigration.java index 8ce0418b9e..b735cc7843 100644 --- a/api/src/main/java/marquez/db/DbMigration.java +++ b/api/src/main/java/marquez/db/DbMigration.java @@ -16,10 +16,16 @@ public final class DbMigration { private DbMigration() {} + private static final boolean DEFAULT_MIGRATE_DB_ON_STARTUP = false; + + public static void migrateDbOrError(@NonNull final DataSource source) { + migrateDbOrError(new FlywayFactory(), source, DEFAULT_MIGRATE_DB_ON_STARTUP); + } + public static void migrateDbOrError( @NonNull final FlywayFactory flywayFactory, @NonNull final DataSource source, - final boolean migrateOnStartup) { + final boolean migrateDbOnStartup) { final Flyway flyway = flywayFactory.build(source); // Only attempt a database migration if there are pending changes to be applied, // or on the initialization of a new database. Otherwise, error on pending changes @@ -27,7 +33,7 @@ public static void migrateDbOrError( if (!hasPendingDbMigrations(flyway)) { log.info("No pending migrations found, skipping..."); return; - } else if (!migrateOnStartup && hasDbMigrationsApplied(flyway)) { + } else if (!migrateDbOnStartup && hasDbMigrationsApplied(flyway)) { errorOnPendingDbMigrations(flyway); } // Attempt to perform a database migration. An exception is thrown on failed migration attempts diff --git a/api/src/main/java/marquez/db/DbRetention.java b/api/src/main/java/marquez/db/DbRetention.java new file mode 100644 index 0000000000..fa30abefb6 --- /dev/null +++ b/api/src/main/java/marquez/db/DbRetention.java @@ -0,0 +1,703 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db; + +import static marquez.common.base.MorePreconditions.checkNotBlank; + +import com.google.common.base.Stopwatch; +import java.sql.Types; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import marquez.db.exceptions.DbRetentionException; +import org.jdbi.v3.core.Handle; +import org.jdbi.v3.core.Jdbi; +import org.jdbi.v3.core.statement.OutParameters; +import org.jdbi.v3.core.statement.Script; + +/** + * Apply retention policy directly to source, dataset, and job metadata collected by Marquez. When + * invoking {@link DbRetention#retentionOnDbOrError(Jdbi, int, int)}, retention is applied by + * invoking the following methods in an order of precedence from first-to-last: + * + *
    + *
  • {@code retentionOnJobs()} + *
  • {@code retentionOnJobVersions()} + *
  • {@code retentionOnRuns()} + *
  • {@code retentionOnDatasets()} + *
  • {@code retentionOnDatasetVersions()} + *
  • {@code retentionOnLineageEvents()} + *
+ * + *

Applying retention is not reversible, but can be applied many times. For this to perform well, + * we delete rows in batches; this divides the deletion process into smaller chunks; the number of + * rows to delete per batch is configurable. You may also apply retention as a dry run by invoking + * {@link DbRetention#retentionOnDbOrError(Jdbi, int, int, boolean)}. By default, dry runs are + * disable. + * + *

When retention is configured, the following operations will be applied: + * + *

    + *
  • Delete jobs from {@code jobs} table if {@code jobs.updated_at} older than retention days. + *
  • Delete job versions from {@code job_versions} table if {@code job_versions.updated_at} + * older than retention days; a job version will not be deleted if the job version is the + * {@code current} version of a given job. + *
  • Delete runs from {@code runs} table if {@code uns.updated_at} older than retention days; a + * run will not be deleted if the run is the {@code current} run of a given job version. + *
  • Delete dataset from datasets table if {@code datasets.updated_at} older than retention + * days; a dataset will not be deleted if the dataset is an input / output of a given job + * version. + *
  • Delete dataset versions from {@code dataset_versions} table if {@code + * dataset_versions.created_at} older than retention days; a dataset version will not be + * deleted if the dataset version is the {@code current} version of a given dataset version, + * or the input of a run. + *
  • Delete lineage events from {@code lineage_events} table if {@code + * lineage_events.event_time} older than retentionDays. + *
+ */ +@Slf4j +public final class DbRetention { + private DbRetention() {} + + /* Default retention days. */ + public static final int DEFAULT_RETENTION_DAYS = 7; + + /* Default number of rows deleted per batch. */ + public static final int DEFAULT_NUMBER_OF_ROWS_PER_BATCH = 1000; + + /* Disable retention dry run by default. */ + public static final boolean DEFAULT_DRY_RUN = false; + + /** Applies the retention policy to database. */ + public static void retentionOnDbOrError( + @NonNull final Jdbi jdbi, final int numberOfRowsPerBatch, final int retentionDays) + throws DbRetentionException { + retentionOnDbOrError(jdbi, numberOfRowsPerBatch, retentionDays, DEFAULT_DRY_RUN); + } + + /** Applies the retention policy to database; optionally as a dry run if specified. */ + public static void retentionOnDbOrError( + @NonNull final Jdbi jdbi, + final int numberOfRowsPerBatch, + final int retentionDays, + final boolean dryRun) + throws DbRetentionException { + if (dryRun) { + // On a dry run, add function(s) to return estimate of rows deleted (if not present). + jdbi.useHandle( + handle -> + handle.execute(CREATE_OR_REPLACE_FUNCTION_ESTIMATE_NUMBER_OF_ROWS_OLDER_THAN_X_DAYS)); + } + // Apply retention policy jobs, job versions, runs, datasets, and dataset versions. + retentionOnJobs(jdbi, numberOfRowsPerBatch, retentionDays, dryRun); + retentionOnJobVersions(jdbi, numberOfRowsPerBatch, retentionDays, dryRun); + retentionOnRuns(jdbi, numberOfRowsPerBatch, retentionDays, dryRun); + retentionOnDatasets(jdbi, numberOfRowsPerBatch, retentionDays, dryRun); + retentionOnDatasetVersions(jdbi, numberOfRowsPerBatch, retentionDays, dryRun); + + // Finally, apply retention policy to lineage events. + retentionOnLineageEvents(jdbi, numberOfRowsPerBatch, retentionDays, dryRun); + } + + /** Apply retention policy on {@code jobs}. */ + private static void retentionOnJobs( + @NonNull final Jdbi jdbi, + final int numberOfRowsPerBatch, + final int retentionDays, + final boolean dryRun) { + if (dryRun) { + // Get estimate of rows older than X days, then log to console. + final int rowsOlderThanXDaysEstimated = + estimateOfRowsOlderThanXDays( + jdbi, sql(DRY_RUN_DELETE_FROM_JOBS_OLDER_THAN_X_DAYS, retentionDays)); + log.info( + "A retention policy of '{}' days will delete (estimated): '{}' jobs", + retentionDays, + rowsOlderThanXDaysEstimated); + return; + } + log.info("Applying retention policy of '{}' days to jobs...", retentionDays); + final Stopwatch rowsDeleteTime = Stopwatch.createStarted(); + final int rowsDeleted = + jdbi.withHandle( + handle -> { + handle.execute( + sql( + """ + CREATE OR REPLACE FUNCTION delete_jobs_older_than_x_days() + RETURNS INT AS $$ + DECLARE + rows_per_batch INT := ${numberOfRowsPerBatch}; + rows_deleted INT; + rows_deleted_total INT := 0; + BEGIN + LOOP + WITH deleted_rows AS ( + DELETE FROM jobs + WHERE uuid IN ( + SELECT uuid + FROM jobs + WHERE updated_at < CURRENT_TIMESTAMP - INTERVAL '${retentionDays} days' + FOR UPDATE SKIP LOCKED + LIMIT rows_per_batch + ) RETURNING uuid + ) + SELECT COUNT(*) INTO rows_deleted FROM deleted_rows; + rows_deleted_total := rows_deleted_total + rows_deleted; + EXIT WHEN rows_deleted = 0; + PERFORM pg_sleep(0.1); + END LOOP; + RETURN rows_deleted_total; + END; + $$ LANGUAGE plpgsql;""", + numberOfRowsPerBatch, + retentionDays)); + return callWith(handle, "delete_jobs_older_than_x_days()"); + }); + rowsDeleteTime.stop(); + log.info("Deleted '{}' jobs in '{}' ms!", rowsDeleted, rowsDeleteTime.elapsed().toMillis()); + } + + /** Apply retention policy on {@code job versions}. */ + private static void retentionOnJobVersions( + @NonNull final Jdbi jdbi, + final int numberOfRowsPerBatch, + final int retentionDays, + final boolean dryRun) { + if (dryRun) { + // Get estimate of rows older than X days, then log to console. + final int rowsOlderThanXDaysEstimated = + estimateOfRowsOlderThanXDays( + jdbi, sql(DRY_RUN_DELETE_FROM_JOB_VERSIONS_OLDER_THAN_X_DAYS, retentionDays)); + log.info( + "A retention policy of '{}' days will delete (estimated): '{}' job versions", + retentionDays, + rowsOlderThanXDaysEstimated); + return; + } + log.info("Applying retention policy of '{}' days to job versions...", retentionDays); + final Stopwatch rowsDeleteTime = Stopwatch.createStarted(); + final int rowsDeleted = + jdbi.withHandle( + handle -> { + handle.execute( + sql( + """ + CREATE OR REPLACE FUNCTION delete_job_versions_older_than_x_days() + RETURNS INT AS $$ + DECLARE + rows_per_batch INT := ${numberOfRowsPerBatch}; + rows_deleted INT; + rows_deleted_total INT := 0; + BEGIN + CREATE TEMPORARY TABLE used_job_versions_as_current_in_x_days AS ( + SELECT current_version_uuid + FROM jobs + WHERE updated_at >= CURRENT_TIMESTAMP - INTERVAL '${retentionDays} days' + ); + LOOP + WITH deleted_rows AS ( + DELETE FROM job_versions AS jv + WHERE uuid IN ( + SELECT uuid + FROM job_versions + WHERE created_at < CURRENT_TIMESTAMP - INTERVAL '${retentionDays} days' + FOR UPDATE SKIP LOCKED + LIMIT rows_per_batch + ) AND NOT EXISTS ( + SELECT 1 + FROM used_job_versions_as_current_in_x_days AS ujvc + WHERE jv.uuid = ujvc.current_version_uuid + ) RETURNING uuid + ) + SELECT COUNT(*) INTO rows_deleted FROM deleted_rows; + rows_deleted_total := rows_deleted_total + rows_deleted; + EXIT WHEN rows_deleted = 0; + PERFORM pg_sleep(0.1); + END LOOP; + DROP TABLE used_job_versions_as_current_in_x_days; + RETURN rows_deleted_total; + END; + $$ LANGUAGE plpgsql;""", + numberOfRowsPerBatch, + retentionDays)); + return callWith(handle, "delete_job_versions_older_than_x_days()"); + }); + rowsDeleteTime.stop(); + log.info( + "Deleted '{}' job versions in '{}' ms!", rowsDeleted, rowsDeleteTime.elapsed().toMillis()); + } + + /** Apply retention policy on {@code runs}. */ + private static void retentionOnRuns( + @NonNull final Jdbi jdbi, + final int numberOfRowsPerBatch, + final int retentionDays, + final boolean dryRun) { + if (dryRun) { + // Get estimate of rows older than X days, then log to console. + final int rowsOlderThanXDaysEstimated = + estimateOfRowsOlderThanXDays( + jdbi, sql(DRY_RUN_DELETE_FROM_RUNS_OLDER_THAN_X_DAYS, retentionDays)); + log.info( + "A retention policy of '{}' days will delete (estimated): '{}' runs", + retentionDays, + rowsOlderThanXDaysEstimated); + return; + } + log.info("Applying retention policy of '{}' days to runs...", retentionDays); + final Stopwatch rowsDeleteTime = Stopwatch.createStarted(); + final int rowsDeleted = + jdbi.withHandle( + handle -> { + handle.execute( + sql( + """ + CREATE OR REPLACE FUNCTION delete_runs_older_than_x_days() + RETURNS INT AS $$ + DECLARE + rows_per_batch INT := ${numberOfRowsPerBatch}; + rows_deleted INT; + rows_deleted_total INT := 0; + BEGIN + LOOP + WITH deleted_rows AS ( + DELETE FROM runs + WHERE uuid IN ( + SELECT uuid + FROM runs + WHERE updated_at < CURRENT_TIMESTAMP - INTERVAL '${retentionDays} days' + FOR UPDATE SKIP LOCKED + LIMIT rows_per_batch + ) RETURNING uuid + ) + SELECT COUNT(*) INTO rows_deleted FROM deleted_rows; + rows_deleted_total := rows_deleted_total + rows_deleted; + EXIT WHEN rows_deleted = 0; + PERFORM pg_sleep(0.1); + END LOOP; + RETURN rows_deleted_total; + END; + $$ LANGUAGE plpgsql;""", + numberOfRowsPerBatch, + retentionDays)); + return callWith(handle, "delete_runs_older_than_x_days()"); + }); + rowsDeleteTime.stop(); + log.info("Deleted '{}' runs in '{}' ms!", rowsDeleted, rowsDeleteTime.elapsed().toMillis()); + } + + /** Apply retention policy on {@code datasets}. */ + private static void retentionOnDatasets( + @NonNull final Jdbi jdbi, + final int numberOfRowsPerBatch, + final int retentionDays, + final boolean dryRun) { + if (dryRun) { + // On a dry run, add function(s) to return estimate of rows deleted. + jdbi.useHandle( + handle -> { + try (final Script script = + handle.createScript(sql(DRY_RUN_CREATE_TEMP_TABLES_FOR_DATASETS, retentionDays))) { + script.execute(); + } + }); + // Get estimate of rows older than X days, then log to console. + final int rowsOlderThanXDaysEstimated = + estimateOfRowsOlderThanXDays( + jdbi, sql(DRY_RUN_DELETE_FROM_DATASETS_OLDER_THAN_X_DAYS, retentionDays)); + log.info( + "A retention policy of '{}' days will delete (estimated): '{}' datasets", + retentionDays, + rowsOlderThanXDaysEstimated); + // Drop function(s) used to return estimate of rows deleted. + jdbi.useHandle( + handle -> { + try (final Script script = handle.createScript(DRY_RUN_DROP_TEMP_TABLES_FOR_DATASETS)) { + script.execute(); + } + }); + return; + } + log.info("Applying retention policy of '{}' days to datasets...", retentionDays); + final Stopwatch rowsDeleteTime = Stopwatch.createStarted(); + final int rowsDeleted = + jdbi.withHandle( + handle -> { + handle.execute( + sql( + """ + CREATE OR REPLACE FUNCTION delete_datasets_older_than_x_days() + RETURNS INT AS $$ + DECLARE + rows_per_batch INT := ${numberOfRowsPerBatch}; + rows_deleted INT; + rows_deleted_total INT := 0; + BEGIN + CREATE TEMPORARY TABLE used_datasets_as_io_in_x_days AS ( + SELECT dataset_uuid + FROM job_versions_io_mapping AS jvio INNER JOIN job_versions AS jv + ON jvio.job_version_uuid = jv.uuid + WHERE jv.created_at >= CURRENT_TIMESTAMP - INTERVAL '${retentionDays} days' + ); + LOOP + WITH deleted_rows AS ( + DELETE FROM datasets AS d + WHERE d.uuid IN ( + SELECT uuid + FROM datasets + WHERE updated_at < CURRENT_TIMESTAMP - INTERVAL '${retentionDays} days' + FOR UPDATE SKIP LOCKED + LIMIT rows_per_batch + ) AND NOT EXISTS ( + SELECT 1 + FROM used_datasets_as_io_in_x_days AS udaio + WHERE d.uuid = udaio.dataset_uuid + ) RETURNING uuid + ) + SELECT COUNT(*) INTO rows_deleted FROM deleted_rows; + rows_deleted_total := rows_deleted_total + rows_deleted; + EXIT WHEN rows_deleted = 0; + PERFORM pg_sleep(0.1); + END LOOP; + DROP TABLE used_datasets_as_io_in_x_days; + RETURN rows_deleted_total; + END; + $$ LANGUAGE plpgsql;""", + numberOfRowsPerBatch, + retentionDays)); + return callWith(handle, "delete_datasets_older_than_x_days()"); + }); + rowsDeleteTime.stop(); + log.info("Deleted '{}' datasets in '{}' ms!", rowsDeleted, rowsDeleteTime.elapsed().toMillis()); + } + + /** Apply retention policy on {@code dataset versions}. */ + private static void retentionOnDatasetVersions( + @NonNull final Jdbi jdbi, + final int numberOfRowsPerBatch, + final int retentionDays, + final boolean dryRun) { + if (dryRun) { + // On a dry run, add function(s) to return estimate of rows deleted. + jdbi.useHandle( + handle -> { + try (final Script script = + handle.createScript( + sql(DRY_RUN_CREATE_TEMP_TABLES_FOR_DATASET_VERSIONS, retentionDays))) { + script.execute(); + } + }); + // Get estimate of rows older than X days, then log to console. + final int rowsOlderThanXDaysEstimated = + estimateOfRowsOlderThanXDays( + jdbi, sql(DRY_RUN_DELETE_FROM_DATASET_VERSIONS_OLDER_THAN_X_DAYS, retentionDays)); + log.info( + "A retention policy of '{}' days will delete (estimated): '{}' dataset versions", + retentionDays, + rowsOlderThanXDaysEstimated); + // Drop function(s) used to return estimate of rows deleted. + jdbi.useHandle( + handle -> { + try (final Script script = + handle.createScript(DRY_RUN_DROP_TEMP_TABLES_FOR_DATASET_VERSIONS)) { + script.execute(); + } + }); + return; + } + log.info("Applying retention policy of '{}' days to dataset versions...", retentionDays); + final Stopwatch rowsDeleteTime = Stopwatch.createStarted(); + final int rowsDeleted = + jdbi.withHandle( + handle -> { + handle.execute( + sql( + """ + CREATE OR REPLACE FUNCTION delete_dataset_versions_older_than_x_days() + RETURNS INT AS $$ + DECLARE + rows_per_batch INT := ${numberOfRowsPerBatch}; + rows_deleted INT; + rows_deleted_total INT := 0; + BEGIN + CREATE TEMPORARY TABLE used_dataset_versions_as_input_in_x_days AS ( + SELECT dataset_version_uuid + FROM runs_input_mapping AS ri INNER JOIN runs AS r + ON ri.run_uuid = r.uuid + WHERE r.created_at >= CURRENT_TIMESTAMP - INTERVAL '${retentionDays} days' + ); + CREATE TEMPORARY TABLE used_dataset_versions_as_current_in_x_days AS ( + SELECT current_version_uuid + FROM datasets + WHERE updated_at >= CURRENT_TIMESTAMP - INTERVAL '${retentionDays} days' + ); + LOOP + WITH deleted_rows AS ( + DELETE FROM dataset_versions AS dv + WHERE dv.uuid IN ( + SELECT uuid + FROM dataset_versions + WHERE created_at < CURRENT_TIMESTAMP - INTERVAL '${retentionDays} days' + FOR UPDATE SKIP LOCKED + LIMIT rows_per_batch + ) AND NOT EXISTS ( + SELECT 1 + FROM used_dataset_versions_as_input_in_x_days AS udvi + WHERE dv.uuid = udvi.dataset_version_uuid + ) AND NOT EXISTS ( + SELECT 1 + FROM used_dataset_versions_as_current_in_x_days AS udvc + WHERE dv.uuid = udvc.current_version_uuid + ) RETURNING uuid + ) + SELECT COUNT(*) INTO rows_deleted FROM deleted_rows; + rows_deleted_total := rows_deleted_total + rows_deleted; + EXIT WHEN rows_deleted = 0; + PERFORM pg_sleep(0.1); + END LOOP; + DROP TABLE used_dataset_versions_as_input_in_x_days; + DROP TABLE used_dataset_versions_as_current_in_x_days; + RETURN rows_deleted_total; + END; + $$ LANGUAGE plpgsql;""", + numberOfRowsPerBatch, + retentionDays)); + return callWith(handle, "delete_dataset_versions_older_than_x_days()"); + }); + rowsDeleteTime.stop(); + log.info( + "Deleted '{}' dataset versions in '{}' ms!", + rowsDeleted, + rowsDeleteTime.elapsed().toMillis()); + } + + private static void retentionOnLineageEvents( + @NonNull final Jdbi jdbi, + final int numberOfRowsPerBatch, + final int retentionDays, + final boolean dryRun) { + if (dryRun) { + // Get estimate of rows older than X days, then log to console. + final int rowsOlderThanXDaysEstimated = + estimateOfRowsOlderThanXDays( + jdbi, sql(DRY_RUN_DELETE_FROM_LINEAGE_EVENTS_OLDER_THAN_X_DAYS, retentionDays)); + log.info( + "A retention policy of '{}' days will delete (estimated): '{}' lineage events", + retentionDays, + rowsOlderThanXDaysEstimated); + return; + } + log.info("Applying retention policy of '{}' days to lineage events...", retentionDays); + final Stopwatch rowsDeleteTime = Stopwatch.createStarted(); + final int rowsDeleted = + jdbi.withHandle( + handle -> { + handle.execute( + sql( + """ + CREATE OR REPLACE FUNCTION delete_lineage_events_older_than_x_days() + RETURNS INT AS $$ + DECLARE + rows_per_batch INT := ${numberOfRowsPerBatch}; + rows_deleted INT; + rows_deleted_total INT := 0; + BEGIN + LOOP + WITH deleted_rows AS ( + DELETE FROM lineage_events + WHERE run_uuid IN ( + SELECT run_uuid + FROM lineage_events + WHERE event_time < CURRENT_TIMESTAMP - INTERVAL '${retentionDays} days' + FOR UPDATE SKIP LOCKED + LIMIT rows_per_batch + ) RETURNING run_uuid + ) + SELECT COUNT(*) INTO rows_deleted FROM deleted_rows; + rows_deleted_total := rows_deleted_total + rows_deleted; + EXIT WHEN rows_deleted = 0; + PERFORM pg_sleep(0.1); + END LOOP; + RETURN rows_deleted_total; + END; + $$ LANGUAGE plpgsql;""", + numberOfRowsPerBatch, + retentionDays)); + return callWith(handle, "delete_lineage_events_older_than_x_days()"); + }); + rowsDeleteTime.stop(); + log.info( + "Deleted '{}' lineage events in '{}' ms!", + rowsDeleted, + rowsDeleteTime.elapsed().toMillis()); + } + + /** + * Returns generated {@code sql} using the {@code sqlTemplate} and the provided values for {@code + * numberOfRowsPerBatch} and {@code retentionDays}. + */ + private static String sql( + @NonNull final String sqlTemplate, final int numberOfRowsPerBatch, final int retentionDays) { + return checkNotBlank(sqlTemplate) + .replace("${numberOfRowsPerBatch}", String.valueOf(numberOfRowsPerBatch)) + .replace("${retentionDays}", String.valueOf(retentionDays)); + } + + /** + * Returns {@code sql} using the {@code sqlTemplate} and the provided value for {@code + * retentionDays}. + */ + private static String sql(@NonNull final String sqlTemplate, final int retentionDays) { + return checkNotBlank(sqlTemplate).replace("${retentionDays}", String.valueOf(retentionDays)); + } + + /** Returns estimate of rows older than X days. */ + private static int estimateOfRowsOlderThanXDays( + @NonNull final Jdbi jdbi, @NonNull final String retentionQuery) { + return jdbi.withHandle( + handle -> { + final OutParameters result = + handle + .createCall( + "{:estimate = call estimate_number_of_rows_older_than_x_days(:retentionQuery)}") + .bind("retentionQuery", retentionQuery) + .registerOutParameter("estimate", Types.INTEGER) + .invoke(); + return result.getInt("estimate"); + }); + } + + /** Call function with the specified {@code handle}. */ + private static int callWith(@NonNull final Handle handle, @NonNull String functionName) { + final OutParameters result = + handle + .createCall( + """ + {:rows_deleted_total = call ${functionName}} + """ + .replace("${functionName}", functionName)) + .registerOutParameter("rows_deleted_total", Types.INTEGER) + .invoke(); + return result.getInt("rows_deleted_total"); + } + + /** Create {@code estimate_number_of_rows_older_than_x_days()}. */ + private static final String CREATE_OR_REPLACE_FUNCTION_ESTIMATE_NUMBER_OF_ROWS_OLDER_THAN_X_DAYS = + """ + CREATE OR REPLACE FUNCTION estimate_number_of_rows_older_than_x_days(retention_query TEXT) + RETURNS INT AS $$ + DECLARE + query_plan_as_json JSON; + BEGIN + EXECUTE 'EXPLAIN (FORMAT JSON) ' || retention_query INTO query_plan_as_json; + RETURN (query_plan_as_json -> 0 -> 'Plan' ->> 'Plan Rows')::INT; + END; + $$ LANGUAGE plpgsql; + """; + + private static final String DRY_RUN_DELETE_FROM_JOBS_OLDER_THAN_X_DAYS = + """ + DELETE FROM jobs + WHERE updated_at < CURRENT_TIMESTAMP - INTERVAL '${retentionDays} days' + """; + + private static final String DRY_RUN_DELETE_FROM_JOB_VERSIONS_OLDER_THAN_X_DAYS = + """ + DELETE FROM job_versions + WHERE created_at < CURRENT_TIMESTAMP - INTERVAL '${retentionDays} days' + """; + + private static final String DRY_RUN_DELETE_FROM_RUNS_OLDER_THAN_X_DAYS = + """ + DELETE FROM runs + WHERE updated_at < CURRENT_TIMESTAMP - INTERVAL '${retentionDays} days' + """; + + /** Create {@code used_datasets_as_input_in_x_days()}. */ + private static final String DRY_RUN_CREATE_TEMP_TABLES_FOR_DATASETS = + """ + CREATE TEMPORARY TABLE used_datasets_as_input_in_x_days AS ( + SELECT dataset_uuid + FROM job_versions_io_mapping AS jvio INNER JOIN job_versions AS jv + ON jvio.job_version_uuid = jv.uuid + WHERE jv.created_at >= CURRENT_TIMESTAMP - INTERVAL '${retentionDays} days' + AND jvio.io_type = 'INPUT' + ); + """; + + private static final String DRY_RUN_DELETE_FROM_DATASETS_OLDER_THAN_X_DAYS = + """ + DELETE FROM datasets AS d + WHERE d.uuid IN ( + SELECT uuid + FROM datasets + WHERE updated_at < CURRENT_TIMESTAMP - INTERVAL '${retentionDays} days' + ) AND NOT EXISTS ( + SELECT 1 + FROM used_datasets_as_input_in_x_days AS udi + WHERE d.uuid = udi.dataset_uuid + ) + """; + + /** Drop {@code used_datasets_as_input_in_x_days()} */ + private static final String DRY_RUN_DROP_TEMP_TABLES_FOR_DATASETS = + """ + DROP TABLE used_datasets_as_input_in_x_days; + """; + + /** + * Create {@code used_dataset_versions_as_input_in_x_days()} and {@code + * used_dataset_versions_as_current_in_x_days()}. + */ + private static final String DRY_RUN_CREATE_TEMP_TABLES_FOR_DATASET_VERSIONS = + """ + CREATE TEMPORARY TABLE used_dataset_versions_as_input_in_x_days AS ( + SELECT dataset_version_uuid + FROM runs_input_mapping AS ri INNER JOIN runs AS r + ON ri.run_uuid = r.uuid + WHERE r.created_at >= CURRENT_TIMESTAMP - INTERVAL '${retentionDays} days' + ); + CREATE TEMPORARY TABLE used_dataset_versions_as_current_in_x_days AS ( + SELECT current_version_uuid + FROM datasets + WHERE updated_at >= CURRENT_TIMESTAMP - INTERVAL '${retentionDays} days' + ); + """; + + private static final String DRY_RUN_DELETE_FROM_DATASET_VERSIONS_OLDER_THAN_X_DAYS = + """ + DELETE FROM dataset_versions AS dv + WHERE dv.uuid IN ( + SELECT uuid + FROM dataset_versions + WHERE created_at < CURRENT_TIMESTAMP - INTERVAL '${retentionDays} days' + ) AND NOT EXISTS ( + SELECT 1 + FROM used_dataset_versions_as_input_in_x_days AS udvi + WHERE dv.uuid = udvi.dataset_version_uuid + ) OR NOT EXISTS ( + SELECT 1 + FROM used_dataset_versions_as_current_in_x_days AS udvc + WHERE dv.uuid = udvc.current_version_uuid + ) RETURNING uuid + """; + + /** + * Drop {@code used_dataset_versions_as_input_in_x_days()} and {@code + * used_dataset_versions_as_current_in_x_days()}. + */ + private static final String DRY_RUN_DROP_TEMP_TABLES_FOR_DATASET_VERSIONS = + """ + DROP TABLE used_dataset_versions_as_input_in_x_days; + DROP TABLE used_dataset_versions_as_current_in_x_days; + """; + + private static final String DRY_RUN_DELETE_FROM_LINEAGE_EVENTS_OLDER_THAN_X_DAYS = + """ + DELETE FROM lineage_events + WHERE event_time < CURRENT_TIMESTAMP - INTERVAL '${retentionDays} days' + """; +} diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index 5f1b30c545..72dba40d26 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -220,7 +220,6 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper runArgs.getUuid(), nominalStartTime, nominalEndTime, - namespace.getUuid(), namespace.getName(), job.getName(), job.getLocation()); @@ -800,8 +799,11 @@ private List upsertColumnLineage( ColumnLineageDao columnLineageDao, DatasetFieldDao datasetFieldDao, DatasetVersionRow datasetVersionRow) { + Logger log = LoggerFactory.getLogger(OpenLineageDao.class); + // get all the fields related to this particular run List runFields = datasetFieldDao.findInputFieldsDataAssociatedWithRun(runUuid); + log.debug("Found input datasets fields for run '{}': {}", runUuid, runFields); return Optional.ofNullable(ds.getFacets()) .map(DatasetFacets::getColumnLineage) @@ -821,7 +823,6 @@ private List upsertColumnLineage( datasetFields.stream().filter(dfr -> dfr.getName().equals(columnName)).findAny(); if (outputField.isEmpty()) { - Logger log = LoggerFactory.getLogger(OpenLineageDao.class); log.error( "Cannot produce column lineage for missing output field in output dataset: {}", columnName); @@ -848,6 +849,11 @@ private List upsertColumnLineage( fieldData.getDatasetFieldUuid())) .collect(Collectors.toList()); + log.debug( + "Adding column lineage on output field '{}' for dataset version '{}' with input fields: {}", + outputField.get().getName(), + datasetVersionRow.getUuid(), + inputFields); return columnLineageDao .upsertColumnLineageRow( datasetVersionRow.getUuid(), diff --git a/api/src/main/java/marquez/db/RunDao.java b/api/src/main/java/marquez/db/RunDao.java index 57a708bd64..2d3b6e403a 100644 --- a/api/src/main/java/marquez/db/RunDao.java +++ b/api/src/main/java/marquez/db/RunDao.java @@ -301,7 +301,6 @@ RunRow upsert( UUID runArgsUuid, Instant nominalStartTime, Instant nominalEndTime, - UUID namespaceUuid, String namespaceName, String jobName, String location); diff --git a/api/src/main/java/marquez/db/exceptions/DbException.java b/api/src/main/java/marquez/db/exceptions/DbException.java new file mode 100644 index 0000000000..b17479c9f7 --- /dev/null +++ b/api/src/main/java/marquez/db/exceptions/DbException.java @@ -0,0 +1,28 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db.exceptions; + +import javax.annotation.Nullable; + +/** An exception thrown to indicate a database error. */ +public class DbException extends Exception { + private static final long serialVersionUID = 1L; + + /** Constructs a {@code DbException} with the provided {@code message}. */ + public DbException(@Nullable final String message) { + super(message); + } + + /** Constructs a {@code DbException} with the provided {@code cause}. */ + DbException(@Nullable final Throwable cause) { + super(cause); + } + + /** Constructs a {@code DbException} with the provided {@code message} and the {@code cause}. */ + DbException(@Nullable final String message, @Nullable final Throwable cause) { + super(message, cause); + } +} diff --git a/api/src/main/java/marquez/db/exceptions/DbRetentionException.java b/api/src/main/java/marquez/db/exceptions/DbRetentionException.java new file mode 100644 index 0000000000..6a44a44819 --- /dev/null +++ b/api/src/main/java/marquez/db/exceptions/DbRetentionException.java @@ -0,0 +1,30 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db.exceptions; + +import javax.annotation.Nullable; + +/** An exception thrown to indicate a database retention policy error. */ +public final class DbRetentionException extends DbException { + + /** Constructs a {@code DbRetentionException} with the provided {@code message}. */ + public DbRetentionException(@Nullable String message) { + super(message); + } + + /** Constructs a {@code DbRetentionException} with the provided {@code cause}. */ + public DbRetentionException(@Nullable final Throwable cause) { + super(cause); + } + + /** + * Constructs a {@code DbRetentionException} with the provided {@code message} and the {@code + * cause}. + */ + public DbRetentionException(@Nullable final String message, @Nullable final Throwable cause) { + super(message, cause); + } +} diff --git a/api/src/main/java/marquez/db/mappers/DatasetRowMapper.java b/api/src/main/java/marquez/db/mappers/DatasetRowMapper.java index 00cc066282..de60957563 100644 --- a/api/src/main/java/marquez/db/mappers/DatasetRowMapper.java +++ b/api/src/main/java/marquez/db/mappers/DatasetRowMapper.java @@ -31,7 +31,9 @@ public DatasetRow map(@NonNull ResultSet results, @NonNull StatementContext cont timestampOrThrow(results, Columns.CREATED_AT), timestampOrThrow(results, Columns.UPDATED_AT), uuidOrThrow(results, Columns.NAMESPACE_UUID), + stringOrNull(results, Columns.NAMESPACE_NAME), uuidOrThrow(results, Columns.SOURCE_UUID), + stringOrNull(results, Columns.SOURCE_NAME), stringOrThrow(results, Columns.NAME), stringOrThrow(results, Columns.PHYSICAL_NAME), timestampOrNull(results, Columns.LAST_MODIFIED_AT), diff --git a/api/src/main/java/marquez/db/mappers/DatasetVersionRowMapper.java b/api/src/main/java/marquez/db/mappers/DatasetVersionRowMapper.java index 79d922fbcd..4cd6febcfb 100644 --- a/api/src/main/java/marquez/db/mappers/DatasetVersionRowMapper.java +++ b/api/src/main/java/marquez/db/mappers/DatasetVersionRowMapper.java @@ -28,6 +28,8 @@ public DatasetVersionRow map(@NonNull ResultSet results, @NonNull StatementConte uuidOrThrow(results, Columns.DATASET_UUID), uuidOrThrow(results, Columns.VERSION), stringOrNull(results, Columns.LIFECYCLE_STATE), - uuidOrNull(results, Columns.RUN_UUID)); + uuidOrNull(results, Columns.RUN_UUID), + stringOrNull(results, Columns.NAMESPACE_NAME), + stringOrNull(results, Columns.DATASET_NAME)); } } diff --git a/api/src/main/java/marquez/db/mappers/JobRowMapper.java b/api/src/main/java/marquez/db/mappers/JobRowMapper.java index 0ac62d9cc5..b16eb1dc74 100644 --- a/api/src/main/java/marquez/db/mappers/JobRowMapper.java +++ b/api/src/main/java/marquez/db/mappers/JobRowMapper.java @@ -40,6 +40,7 @@ public JobRow map(@NonNull ResultSet results, @NonNull StatementContext context) stringOrThrow(results, Columns.TYPE), timestampOrThrow(results, Columns.CREATED_AT), timestampOrThrow(results, Columns.UPDATED_AT), + uuidOrNull(results, Columns.NAMESPACE_UUID), stringOrThrow(results, Columns.NAMESPACE_NAME), stringOrThrow(results, Columns.NAME), stringOrThrow(results, Columns.SIMPLE_NAME), diff --git a/api/src/main/java/marquez/db/mappers/RunRowMapper.java b/api/src/main/java/marquez/db/mappers/RunRowMapper.java index 4c4f4027e6..8d22afb11d 100644 --- a/api/src/main/java/marquez/db/mappers/RunRowMapper.java +++ b/api/src/main/java/marquez/db/mappers/RunRowMapper.java @@ -47,7 +47,9 @@ public RunRow map(@NonNull ResultSet results, @NonNull StatementContext context) : null, uuidOrNull(results, Columns.START_RUN_STATE_UUID), columnNames.contains(Columns.ENDED_AT) ? timestampOrNull(results, Columns.ENDED_AT) : null, - uuidOrNull(results, Columns.END_RUN_STATE_UUID)); + uuidOrNull(results, Columns.END_RUN_STATE_UUID), + stringOrNull(results, Columns.JOB_NAME), + stringOrNull(results, Columns.NAMESPACE_NAME)); } private List toDatasetVersion(ResultSet rs, String column) throws SQLException { diff --git a/api/src/main/java/marquez/db/models/DatasetRow.java b/api/src/main/java/marquez/db/models/DatasetRow.java index 08e0b5c638..830b5de045 100644 --- a/api/src/main/java/marquez/db/models/DatasetRow.java +++ b/api/src/main/java/marquez/db/models/DatasetRow.java @@ -15,6 +15,9 @@ import lombok.NonNull; import lombok.ToString; import lombok.With; +import marquez.common.models.DatasetId; +import marquez.common.models.DatasetName; +import marquez.common.models.NamespaceName; @AllArgsConstructor @EqualsAndHashCode @@ -25,7 +28,9 @@ public class DatasetRow { @Getter @NonNull private final Instant createdAt; @Getter @NonNull private final Instant updatedAt; @Getter @NonNull private final UUID namespaceUuid; + @Getter @Nullable private final String namespaceName; @Getter @NonNull private final UUID sourceUuid; + @Getter @Nullable private final String sourceName; @Getter @NonNull private final String name; @Getter @NonNull private final String physicalName; @Nullable private final Instant lastModifiedAt; @@ -44,4 +49,9 @@ public Optional getDescription() { public Optional getCurrentVersionUuid() { return Optional.ofNullable(currentVersionUuid); } + + /** ... */ + public DatasetId toDatasetId() { + return new DatasetId(NamespaceName.of(namespaceName), DatasetName.of(name)); + } } diff --git a/api/src/main/java/marquez/db/models/DatasetVersionRow.java b/api/src/main/java/marquez/db/models/DatasetVersionRow.java index 51266ddb95..50d00d6406 100644 --- a/api/src/main/java/marquez/db/models/DatasetVersionRow.java +++ b/api/src/main/java/marquez/db/models/DatasetVersionRow.java @@ -25,6 +25,8 @@ public class DatasetVersionRow { @Getter @NonNull private final UUID version; @Getter @Nullable private final String lifecycleState; @Nullable private final UUID runUuid; + @Getter @Nullable private final String namespaceName; + @Getter @Nullable private final String datasetName; public Optional getRunUuid() { return Optional.ofNullable(runUuid); diff --git a/api/src/main/java/marquez/db/models/ExtendedDatasetVersionRow.java b/api/src/main/java/marquez/db/models/ExtendedDatasetVersionRow.java index 816740a92c..7a022a0203 100644 --- a/api/src/main/java/marquez/db/models/ExtendedDatasetVersionRow.java +++ b/api/src/main/java/marquez/db/models/ExtendedDatasetVersionRow.java @@ -15,6 +15,7 @@ @EqualsAndHashCode(callSuper = true) @ToString(callSuper = true) +@Deprecated public class ExtendedDatasetVersionRow extends DatasetVersionRow { @Getter private @NonNull String namespaceName; @Getter private @NonNull String datasetName; @@ -28,7 +29,8 @@ public ExtendedDatasetVersionRow( UUID runUuid, @NonNull final String namespaceName, @NonNull final String datasetName) { - super(uuid, createdAt, datasetUuid, version, lifecycleState, runUuid); + super( + uuid, createdAt, datasetUuid, version, lifecycleState, runUuid, namespaceName, datasetName); this.namespaceName = namespaceName; this.datasetName = datasetName; } diff --git a/api/src/main/java/marquez/db/models/ExtendedRunRow.java b/api/src/main/java/marquez/db/models/ExtendedRunRow.java index 13678b4d3d..d4487318f5 100644 --- a/api/src/main/java/marquez/db/models/ExtendedRunRow.java +++ b/api/src/main/java/marquez/db/models/ExtendedRunRow.java @@ -60,7 +60,9 @@ public ExtendedRunRow( startedAt, startRunStateUuid, endedAt, - endRunStateUuid); + endRunStateUuid, + jobName, + namespaceName); this.inputVersions = inputVersions; this.outputVersions = outputVersions; this.args = args; diff --git a/api/src/main/java/marquez/db/models/InputFieldData.java b/api/src/main/java/marquez/db/models/InputFieldData.java index 56e2084984..fcdd7932f2 100644 --- a/api/src/main/java/marquez/db/models/InputFieldData.java +++ b/api/src/main/java/marquez/db/models/InputFieldData.java @@ -9,9 +9,11 @@ import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NonNull; +import lombok.ToString; @Getter @AllArgsConstructor +@ToString public class InputFieldData { @NonNull String namespace; @NonNull String datasetName; diff --git a/api/src/main/java/marquez/db/models/JobRow.java b/api/src/main/java/marquez/db/models/JobRow.java index a71b60fdd8..fe3fede298 100644 --- a/api/src/main/java/marquez/db/models/JobRow.java +++ b/api/src/main/java/marquez/db/models/JobRow.java @@ -20,6 +20,7 @@ public class JobRow { @NonNull String type; @NonNull Instant createdAt; @NonNull Instant updatedAt; + @Nullable UUID namespaceUuid; @NonNull String namespaceName; @NonNull String name; @NonNull String simpleName; diff --git a/api/src/main/java/marquez/db/models/RunRow.java b/api/src/main/java/marquez/db/models/RunRow.java index 8a5ecaf652..4b0c14aa07 100644 --- a/api/src/main/java/marquez/db/models/RunRow.java +++ b/api/src/main/java/marquez/db/models/RunRow.java @@ -33,6 +33,8 @@ public class RunRow { @Nullable private final UUID startRunStateUuid; @Nullable private final Instant endedAt; @Nullable private final UUID endRunStateUuid; + @Getter @Nullable private final String jobName; + @Getter @Nullable private final String namespaceName; public Optional getParentRunUuid() { return Optional.ofNullable(parentRunUuid); diff --git a/api/src/main/java/marquez/jobs/DbRetentionConfig.java b/api/src/main/java/marquez/jobs/DbRetentionConfig.java new file mode 100644 index 0000000000..f60dd066c6 --- /dev/null +++ b/api/src/main/java/marquez/jobs/DbRetentionConfig.java @@ -0,0 +1,21 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.jobs; + +import static marquez.db.DbRetention.DEFAULT_NUMBER_OF_ROWS_PER_BATCH; +import static marquez.db.DbRetention.DEFAULT_RETENTION_DAYS; + +import lombok.Getter; +import lombok.Setter; + +/** Configuration for {@link DbRetentionJob}. */ +public final class DbRetentionConfig { + public static final int DEFAULT_FREQUENCY_MINS = 15; + + @Getter @Setter private int frequencyMins = DEFAULT_FREQUENCY_MINS; + @Getter @Setter private int numberOfRowsPerBatch = DEFAULT_NUMBER_OF_ROWS_PER_BATCH; + @Getter @Setter private int retentionDays = DEFAULT_RETENTION_DAYS; +} diff --git a/api/src/main/java/marquez/jobs/DbRetentionJob.java b/api/src/main/java/marquez/jobs/DbRetentionJob.java new file mode 100644 index 0000000000..42b75ebd0e --- /dev/null +++ b/api/src/main/java/marquez/jobs/DbRetentionJob.java @@ -0,0 +1,93 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.jobs; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.util.concurrent.AbstractScheduledService; +import io.dropwizard.lifecycle.Managed; +import java.time.Duration; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import marquez.db.DbRetention; +import marquez.db.exceptions.DbRetentionException; +import org.jdbi.v3.core.Jdbi; + +/** + * A job that applies a retention policy on a fixed schedule to source, dataset, and job metadata in + * Marquez. Use {@code frequencyMins} in {@link DbRetentionConfig} to override the default job run + * frequency interval of {@code 15} mins. You can also use {@code retentionDays} to override the + * default retention policy of {@code 7} days; metadata with a collection date {@code > + * retentionDays} will be deleted. To limit the number of metadata purged per retention execution + * and reduce impact on the database, we recommend adjusting {@code numberOfRowsPerBatch}. + */ +@Slf4j +public class DbRetentionJob extends AbstractScheduledService implements Managed { + private static final Duration NO_DELAY = Duration.ofMinutes(0); + + /* The number of rows deleted per batch. */ + private final int numberOfRowsPerBatch; + + /* The retention days. */ + private final int retentionDays; + + private final Scheduler fixedRateScheduler; + private final Jdbi jdbi; + + /** + * Constructs a {@code DbRetentionJob} with a run frequency {@code frequencyMins}, chunk size of + * {@code numberOfRowsPerBatch} that can be deleted per retention job execution and retention days + * of {@code retentionDays}. + */ + public DbRetentionJob( + @NonNull final Jdbi jdbi, + final int frequencyMins, + final int numberOfRowsPerBatch, + final int retentionDays) { + checkArgument(frequencyMins > 0, "'frequencyMins' must be > 0"); + checkArgument(numberOfRowsPerBatch > 0, "'numberOfRowsPerBatch' must be > 0"); + checkArgument(retentionDays > 0, "'retentionDays' must be > 0"); + this.numberOfRowsPerBatch = numberOfRowsPerBatch; + this.retentionDays = retentionDays; + + this.jdbi = jdbi; + + // Define fixed schedule with no delay. + this.fixedRateScheduler = + Scheduler.newFixedRateSchedule(NO_DELAY, Duration.ofMinutes(frequencyMins)); + } + + @Override + protected Scheduler scheduler() { + return fixedRateScheduler; + } + + @Override + public void start() throws Exception { + log.info("Starting db retention job..."); + startAsync().awaitRunning(); + } + + @Override + protected void runOneIteration() { + try { + // Attempt to apply a database retention policy. An exception is thrown on failed retention + // policy attempts requiring we handle the throwable and log the error. + DbRetention.retentionOnDbOrError(jdbi, numberOfRowsPerBatch, retentionDays); + } catch (DbRetentionException errorOnDbRetention) { + log.error( + "Failed to apply retention policy of '{}' days to database!", + retentionDays, + errorOnDbRetention); + } + } + + @Override + public void stop() throws Exception { + log.info("Stopping db retention job..."); + stopAsync().awaitTerminated(); + } +} diff --git a/api/src/main/resources/marquez/db/migration/V63__alter_tables_add_on_cascade_delete.sql b/api/src/main/resources/marquez/db/migration/V63__alter_tables_add_on_cascade_delete.sql new file mode 100644 index 0000000000..7c3673b86d --- /dev/null +++ b/api/src/main/resources/marquez/db/migration/V63__alter_tables_add_on_cascade_delete.sql @@ -0,0 +1,272 @@ +ALTER TABLE column_lineage +DROP CONSTRAINT column_lineage_output_dataset_version_uuid_fkey, +ADD CONSTRAINT column_lineage_output_dataset_version_uuid_fkey + FOREIGN KEY (output_dataset_version_uuid) + REFERENCES dataset_versions(uuid) + ON DELETE CASCADE; + +ALTER TABLE column_lineage +DROP CONSTRAINT column_lineage_output_dataset_field_uuid_fkey, +ADD CONSTRAINT column_lineage_output_dataset_field_uuid_fkey + FOREIGN KEY (output_dataset_field_uuid) + REFERENCES dataset_fields(uuid) + ON DELETE CASCADE; + +ALTER TABLE column_lineage +DROP CONSTRAINT column_lineage_input_dataset_version_uuid_fkey, +ADD CONSTRAINT column_lineage_input_dataset_version_uuid_fkey + FOREIGN KEY (input_dataset_version_uuid) + REFERENCES dataset_versions(uuid) + ON DELETE CASCADE; + +ALTER TABLE column_lineage +DROP CONSTRAINT column_lineage_input_dataset_field_uuid_fkey, +ADD CONSTRAINT column_lineage_input_dataset_field_uuid_fkey + FOREIGN KEY (input_dataset_field_uuid) + REFERENCES dataset_fields(uuid) + ON DELETE CASCADE; + +ALTER TABLE dataset_facets +DROP CONSTRAINT dataset_facets_dataset_uuid_fkey, +ADD CONSTRAINT dataset_facets_dataset_uuid_fkey + FOREIGN KEY (dataset_uuid) + REFERENCES datasets(uuid) + ON DELETE CASCADE; + +ALTER TABLE dataset_facets +DROP CONSTRAINT dataset_facets_run_uuid_fkey, +ADD CONSTRAINT dataset_facets_run_uuid_fkey + FOREIGN KEY (run_uuid) + REFERENCES runs(uuid) + ON DELETE CASCADE; + +ALTER TABLE dataset_facets +DROP CONSTRAINT dataset_facets_dataset_version_uuid_fkey, +ADD CONSTRAINT dataset_facets_dataset_version_uuid_fkey + FOREIGN KEY (dataset_version_uuid) + REFERENCES dataset_versions(uuid) + ON DELETE CASCADE; + +ALTER TABLE dataset_fields +DROP CONSTRAINT dataset_fields_dataset_uuid_fkey, +ADD CONSTRAINT dataset_fields_dataset_uuid_fkey + FOREIGN KEY (dataset_uuid) + REFERENCES datasets(uuid) + ON DELETE CASCADE; + +ALTER TABLE dataset_fields_tag_mapping +DROP CONSTRAINT dataset_fields_tag_mapping_tag_uuid_fkey, +ADD CONSTRAINT dataset_fields_tag_mapping_tag_uuid_fkey + FOREIGN KEY (tag_uuid) + REFERENCES tags(uuid) + ON DELETE CASCADE; + +ALTER TABLE dataset_fields_tag_mapping +DROP CONSTRAINT dataset_fields_tag_mapping_dataset_field_uuid_fkey, +ADD CONSTRAINT dataset_fields_tag_mapping_dataset_field_uuid_fkey + FOREIGN KEY (dataset_field_uuid) + REFERENCES dataset_fields(uuid) + ON DELETE CASCADE; + +ALTER TABLE dataset_symlinks +DROP CONSTRAINT dataset_symlinks_namespace_uuid_fkey, +ADD CONSTRAINT dataset_symlinks_namespace_uuid_fkey + FOREIGN KEY (namespace_uuid) + REFERENCES namespaces(uuid) + ON DELETE CASCADE; + +ALTER TABLE dataset_versions +DROP CONSTRAINT dataset_versions_dataset_uuid_fkey, +ADD CONSTRAINT dataset_versions_dataset_uuid_fkey + FOREIGN KEY (dataset_uuid) + REFERENCES datasets(uuid) + ON DELETE CASCADE; + +ALTER TABLE dataset_versions_field_mapping +DROP CONSTRAINT dataset_versions_field_mapping_dataset_version_uuid_fkey, +ADD CONSTRAINT dataset_versions_field_mapping_dataset_version_uuid_fkey + FOREIGN KEY (dataset_version_uuid) + REFERENCES dataset_versions(uuid) + ON DELETE CASCADE; + +ALTER TABLE dataset_versions_field_mapping +DROP CONSTRAINT dataset_versions_field_mapping_dataset_field_uuid_fkey, +ADD CONSTRAINT dataset_versions_field_mapping_dataset_field_uuid_fkey + FOREIGN KEY (dataset_field_uuid) + REFERENCES dataset_fields(uuid) + ON DELETE CASCADE; + +ALTER TABLE datasets +DROP CONSTRAINT datasets_namespace_uuid_fkey, +ADD CONSTRAINT datasets_namespace_uuid_fkey + FOREIGN KEY (namespace_uuid) + REFERENCES namespaces(uuid) + ON DELETE CASCADE; + +ALTER TABLE datasets +DROP CONSTRAINT datasets_source_uuid_fkey, +ADD CONSTRAINT datasets_source_uuid_fkey + FOREIGN KEY (source_uuid) + REFERENCES sources(uuid) + ON DELETE CASCADE; + +ALTER TABLE datasets_tag_mapping +DROP CONSTRAINT datasets_tag_mapping_dataset_uuid_fkey, +ADD CONSTRAINT datasets_tag_mapping_dataset_uuid_fkey + FOREIGN KEY (dataset_uuid) + REFERENCES datasets(uuid) + ON DELETE CASCADE; + +ALTER TABLE datasets_tag_mapping +DROP CONSTRAINT datasets_tag_mapping_tag_uuid_fkey, +ADD CONSTRAINT datasets_tag_mapping_tag_uuid_fkey + FOREIGN KEY (tag_uuid) + REFERENCES tags(uuid) + ON DELETE CASCADE; + +ALTER TABLE job_facets +DROP CONSTRAINT job_facets_job_uuid_fkey, +ADD CONSTRAINT job_facets_job_uuid_fkey + FOREIGN KEY (job_uuid) + REFERENCES jobs(uuid) + ON DELETE CASCADE; + +ALTER TABLE job_facets +DROP CONSTRAINT job_facets_run_uuid_fkey, +ADD CONSTRAINT job_facets_run_uuid_fkey + FOREIGN KEY (run_uuid) + REFERENCES runs(uuid) + ON DELETE CASCADE; + +ALTER TABLE job_versions +DROP CONSTRAINT job_versions_job_uuid_fkey, +ADD CONSTRAINT job_versions_job_uuid_fkey + FOREIGN KEY (job_uuid) + REFERENCES jobs(uuid) + ON DELETE CASCADE; + +ALTER TABLE job_versions +DROP CONSTRAINT job_versions_job_uuid_fkey, +ADD CONSTRAINT job_versions_job_uuid_fkey + FOREIGN KEY (job_uuid) + REFERENCES jobs(uuid) + ON DELETE CASCADE; + +ALTER TABLE job_versions_io_mapping +DROP CONSTRAINT job_versions_io_mapping_dataset_uuid_fkey, +ADD CONSTRAINT job_versions_io_mapping_dataset_uuid_fkey + FOREIGN KEY (dataset_uuid) + REFERENCES datasets(uuid) + ON DELETE CASCADE; + +ALTER TABLE job_versions_io_mapping +DROP CONSTRAINT job_versions_io_mapping_job_version_uuid_fkey, +ADD CONSTRAINT job_versions_io_mapping_job_version_uuid_fkey + FOREIGN KEY (job_version_uuid) + REFERENCES job_versions(uuid) + ON DELETE CASCADE; + +ALTER TABLE jobs +DROP CONSTRAINT jobs_parent_fk_jobs, +ADD CONSTRAINT jobs_parent_fk_jobs + FOREIGN KEY (parent_job_uuid) + REFERENCES jobs(uuid) + ON DELETE CASCADE; + +ALTER TABLE jobs +DROP CONSTRAINT jobs_symlink_target_uuid_fkey, +ADD CONSTRAINT jobs_symlink_target_uuid_fkey + FOREIGN KEY (symlink_target_uuid) + REFERENCES jobs(uuid) + ON DELETE CASCADE; + +ALTER TABLE jobs +DROP CONSTRAINT jobs_namespace_uuid_fkey, +ADD CONSTRAINT jobs_namespace_uuid_fkey + FOREIGN KEY (namespace_uuid) + REFERENCES namespaces(uuid) + ON DELETE CASCADE; + +ALTER TABLE namespace_ownerships +DROP CONSTRAINT namespace_ownerships_namespace_uuid_fkey, +ADD CONSTRAINT namespace_ownerships_namespace_uuid_fkey + FOREIGN KEY (namespace_uuid) + REFERENCES namespaces(uuid) + ON DELETE CASCADE; + +ALTER TABLE namespace_ownerships +DROP CONSTRAINT namespace_ownerships_owner_uuid_fkey, +ADD CONSTRAINT namespace_ownerships_owner_uuid_fkey + FOREIGN KEY (owner_uuid) + REFERENCES owners(uuid) + ON DELETE CASCADE; + +ALTER TABLE run_facets +DROP CONSTRAINT run_facets_run_uuid_fkey, +ADD CONSTRAINT run_facets_run_uuid_fkey + FOREIGN KEY (run_uuid) + REFERENCES runs(uuid) + ON DELETE CASCADE; + +ALTER TABLE run_states +DROP CONSTRAINT run_states_run_uuid_fkey, +ADD CONSTRAINT run_states_run_uuid_fkey + FOREIGN KEY (run_uuid) + REFERENCES runs(uuid) + ON DELETE CASCADE; + +ALTER TABLE runs +DROP CONSTRAINT runs_start_run_state_uuid_fkey, +ADD CONSTRAINT runs_start_run_state_uuid_fkey + FOREIGN KEY (start_run_state_uuid) + REFERENCES run_states(uuid) + ON DELETE CASCADE; + +ALTER TABLE runs +DROP CONSTRAINT runs_parent_fk_runs, +ADD CONSTRAINT runs_parent_fk_runs + FOREIGN KEY (parent_run_uuid) + REFERENCES runs(uuid) + ON DELETE CASCADE; + +ALTER TABLE runs +DROP CONSTRAINT runs_end_run_state_uuid_fkey, +ADD CONSTRAINT runs_end_run_state_uuid_fkey + FOREIGN KEY (end_run_state_uuid) + REFERENCES run_states(uuid) + ON DELETE CASCADE; + +ALTER TABLE runs +DROP CONSTRAINT runs_run_args_uuid_fkey, +ADD CONSTRAINT runs_run_args_uuid_fkey + FOREIGN KEY (run_args_uuid) + REFERENCES run_args(uuid) + ON DELETE CASCADE; + +ALTER TABLE runs +DROP CONSTRAINT runs_job_version_uuid_fkey, +ADD CONSTRAINT runs_job_version_uuid_fkey + FOREIGN KEY (job_version_uuid) + REFERENCES job_versions(uuid) + ON DELETE CASCADE; + +ALTER TABLE runs_input_mapping +DROP CONSTRAINT runs_input_mapping_dataset_version_uuid_fkey, +ADD CONSTRAINT runs_input_mapping_dataset_version_uuid_fkey + FOREIGN KEY (dataset_version_uuid) + REFERENCES dataset_versions(uuid) + ON DELETE CASCADE; + +ALTER TABLE runs_input_mapping +DROP CONSTRAINT runs_input_mapping_run_uuid_fkey, +ADD CONSTRAINT runs_input_mapping_run_uuid_fkey + FOREIGN KEY (run_uuid) + REFERENCES runs(uuid) + ON DELETE CASCADE; + +ALTER TABLE stream_versions +DROP CONSTRAINT stream_versions_dataset_version_uuid_fkey, +ADD CONSTRAINT stream_versions_dataset_version_uuid_fkey + FOREIGN KEY (dataset_version_uuid) + REFERENCES dataset_versions(uuid) + ON DELETE CASCADE; diff --git a/api/src/test/java/marquez/api/models/ActiveRun.java b/api/src/test/java/marquez/api/models/ActiveRun.java new file mode 100644 index 0000000000..443e5014bf --- /dev/null +++ b/api/src/test/java/marquez/api/models/ActiveRun.java @@ -0,0 +1,111 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.api.models; + +import static io.openlineage.client.OpenLineage.RunEvent.EventType.COMPLETE; +import static io.openlineage.client.OpenLineage.RunEvent.EventType.START; + +import io.openlineage.client.OpenLineage; +import io.openlineage.client.OpenLineageClient; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.List; +import java.util.UUID; +import javax.annotation.Nullable; +import lombok.Getter; +import lombok.NonNull; + +public class ActiveRun { + // ... + private final OpenLineage ol; + private final OpenLineageClient olClient; + + // ... + @Getter private final OpenLineage.Run run; + @Getter private final OpenLineage.Job job; + @Getter private final List inputs; + @Getter private final List outputs; + + private ActiveRun( + @NonNull final OpenLineage ol, + @NonNull final OpenLineageClient olClient, + @NonNull final OpenLineage.Run run, + @NonNull final OpenLineage.Job job, + @Nullable final List inputs, + @Nullable final List outputs) { + this.ol = ol; + this.olClient = olClient; + this.run = run; + this.job = job; + this.inputs = inputs; + this.outputs = outputs; + } + + public UUID getRunId() { + return run.getRunId(); + } + + public String getJobName() { + return job.getName(); + } + + public void startRun() { + olClient.emit(ol.newRunEvent(START, newEventTime(), run, job, inputs, outputs)); + } + + public void endRun() { + endRun(COMPLETE); + } + + public void endRun(@NonNull OpenLineage.RunEvent.EventType eventType) { + olClient.emit(ol.newRunEvent(eventType, newEventTime(), run, job, inputs, outputs)); + } + + private static ZonedDateTime newEventTime() { + return Instant.now().atZone(ZoneId.of("America/Los_Angeles")); + } + + public static Builder builder() { + return new Builder(); + } + + public static final class Builder { + private OpenLineage ol; + private OpenLineageClient olClient; + + private OpenLineage.Run run; + private OpenLineage.Job job; + private List inputs; + private List outputs; + + private Builder() {} + + public Builder run(@NonNull OpenLineage.Run run) { + this.run = run; + return this; + } + + public Builder job(@NonNull OpenLineage.Job job) { + this.job = job; + return this; + } + + public Builder inputs(@Nullable List inputs) { + this.inputs = inputs; + return this; + } + + public Builder outputs(@Nullable List outputs) { + this.outputs = outputs; + return this; + } + + public ActiveRun build(@NonNull OpenLineage ol, @NonNull OpenLineageClient olClient) { + return new ActiveRun(ol, olClient, run, job, inputs, outputs); + } + } +} diff --git a/api/src/test/java/marquez/api/models/ApiModelGenerator.java b/api/src/test/java/marquez/api/models/ApiModelGenerator.java new file mode 100644 index 0000000000..5a961504f6 --- /dev/null +++ b/api/src/test/java/marquez/api/models/ApiModelGenerator.java @@ -0,0 +1,267 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.api.models; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static graphql.com.google.common.collect.ImmutableSet.toImmutableSet; +import static io.openlineage.client.OpenLineage.RunEvent.EventType.COMPLETE; +import static marquez.common.models.CommonModelGenerator.newDatasetName; +import static marquez.common.models.CommonModelGenerator.newDescription; +import static marquez.common.models.CommonModelGenerator.newFieldName; +import static marquez.common.models.CommonModelGenerator.newFieldType; +import static marquez.common.models.CommonModelGenerator.newJobName; +import static marquez.common.models.CommonModelGenerator.newNamespaceName; +import static marquez.common.models.CommonModelGenerator.newRunId; + +import com.google.common.collect.ImmutableList; +import io.openlineage.client.OpenLineage; +import io.openlineage.client.OpenLineageClient; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.List; +import java.util.Set; +import java.util.stream.Stream; +import lombok.NonNull; +import marquez.Generator; + +/** Generates new instances for {@code marquez.api.models} with random values used for testing. */ +public final class ApiModelGenerator extends Generator { + private ApiModelGenerator() {} + + /** Returns a new {@link ActiveRun} object with a specified {@code namespace}. */ + public static ActiveRun newOlActiveRun( + @NonNull final OpenLineage ol, + @NonNull final OpenLineageClient olClient, + @NonNull final String namespace) { + return newOlActiveRunFor(ol, olClient, namespace, newJobName().getValue()); + } + + /** Returns a new {@link ActiveRun} object with a specified {@code namespace}. */ + public static ActiveRun newOlActiveRunFor( + @NonNull final OpenLineage ol, + @NonNull final OpenLineageClient olClient, + @NonNull final String namespace, + @NonNull final String jobName) { + return ActiveRun.builder() + .run(newRun(ol)) + .job(newJobWith(ol, namespace, jobName)) + .inputs(newInputs(ol, namespace)) + .outputs(newOutputs(ol, namespace)) + .build(ol, olClient); + } + + /** + * Returns new {@link ActiveRun} objects with a specified {@code namespace} and default {@code + * limit}. + */ + public static ImmutableList newOlActiveRuns( + @NonNull final OpenLineage ol, + @NonNull final OpenLineageClient olClient, + @NonNull final String namespace) { + return newOlActiveRunsFor(ol, olClient, namespace, newJobName().getValue(), 4); + } + + /** + * Returns new {@link ActiveRun} objects with a specified {@code namespace} and default {@code + * limit}. + */ + public static ImmutableList newOlActiveRunsFor( + @NonNull final OpenLineage ol, + @NonNull final OpenLineageClient olClient, + @NonNull final String namespace, + @NonNull final String jobName) { + return newOlActiveRunsFor(ol, olClient, namespace, jobName, 4); + } + + /** Returns new {@link ActiveRun} objects with a specified {@code namespace} and {@code limit}. */ + public static ImmutableList newOlActiveRunsFor( + @NonNull final OpenLineage ol, + @NonNull final OpenLineageClient olClient, + @NonNull final String namespace, + @NonNull final String jobName, + final int totalRunsForJob) { + return Stream.generate(() -> newOlActiveRunFor(ol, olClient, namespace, jobName)) + .limit(totalRunsForJob) + .collect(toImmutableList()); + } + + /** ... */ + public static Set newRunEvents( + @NonNull final OpenLineage ol, + @NonNull final Instant now, + @NonNull final String namespaceName, + @NonNull final String jobName, + final int limit) { + return Stream.generate(() -> newRunEvent(ol, now, namespaceName, jobName)) + .limit(limit) + .collect(toImmutableSet()); + } + + /** + * Returns a new {@link OpenLineage.Run} object; no {@code parent} run will be associated with + * {@code child} run. + */ + public static OpenLineage.RunEvent newRunEvent( + @NonNull final OpenLineage ol, + @NonNull final Instant now, + @NonNull final String namespaceName, + @NonNull final String jobName) { + return ol.newRunEventBuilder() + .eventType(COMPLETE) + .eventTime(now.atZone(ZoneId.of("America/Los_Angeles"))) + .run(newRun(ol)) + .job(newJobWith(ol, namespaceName, jobName)) + .inputs(newInputs(ol, namespaceName)) + .outputs(newOutputs(ol, namespaceName)) + .build(); + } + + /** + * Returns a new {@link OpenLineage.Run} object; no {@code parent} run will be associated with + * {@code child} run. + */ + public static Set newRuns(@NonNull final OpenLineage ol, final int limit) { + return Stream.generate(() -> newRun(ol)).limit(limit).collect(toImmutableSet()); + } + + /** + * Returns a new {@link OpenLineage.Run} object; no {@code parent} run will be associated with + * {@code child} run. + */ + public static OpenLineage.Run newRun(@NonNull final OpenLineage ol) { + return newRun(ol, false); + } + + /** + * Returns a new {@link OpenLineage.Run} object. A {@code parent} run will be associated with + * {@code child} run if {@code hasParentRun} is {@code true}; otherwise, the {@code child} run + * will not have a {@code parent} run. + */ + public static OpenLineage.Run newRun(@NonNull final OpenLineage ol, final boolean hasParentRun) { + return ol.newRun( + newRunId().getValue(), + ol.newRunFacetsBuilder() + .parent( + hasParentRun + ? ol.newParentRunFacetBuilder() + .run(newParentRun(ol)) + .job(newParentJob(ol)) + .build() + : null) + .nominalTime( + ol.newNominalTimeRunFacetBuilder() + .nominalStartTime(newNominalTime()) + .nominalEndTime(newNominalTime().plusHours(1)) + .build()) + .build()); + } + + /** Returns a new {@link OpenLineage.ParentRunFacetRun} object. */ + static OpenLineage.ParentRunFacetRun newParentRun(@NonNull final OpenLineage ol) { + return ol.newParentRunFacetRunBuilder().runId(newRunId().getValue()).build(); + } + + /** Returns a new {@link OpenLineage.Job} object with a specified {@code namespace}. */ + static OpenLineage.Job newJob(@NonNull final OpenLineage ol, @NonNull final String namespace) { + return newJobWith(ol, namespace, newJobName().getValue()); + } + + /** Returns a new {@link OpenLineage.Job} object with a specified {@code namespace}. */ + static OpenLineage.Job newJobWith( + @NonNull final OpenLineage ol, @NonNull final String namespace, @NonNull String jobName) { + return ol.newJobBuilder().namespace(namespace).name(jobName).build(); + } + + /** Returns a new {@link OpenLineage.ParentRunFacetJob} object. */ + static OpenLineage.ParentRunFacetJob newParentJob(@NonNull final OpenLineage ol) { + return ol.newParentRunFacetJobBuilder() + .namespace(newNamespaceName().getValue()) + .name(newJobName().getValue()) + .build(); + } + + /** ... */ + static List newInputs( + @NonNull final OpenLineage ol, @NonNull final String namespace) { + return newInputs(ol, namespace, 4, 2); + } + + /** ... */ + static List newInputs( + @NonNull final OpenLineage ol, + @NonNull final String namespace, + final int numOfInputs, + final int numOfFields) { + return Stream.generate( + () -> + ol.newInputDatasetBuilder() + .namespace(namespace) + .name(newDatasetName().getValue()) + .facets( + ol.newDatasetFacetsBuilder() + .schema(newDatasetSchema(ol, numOfFields)) + .build()) + .build()) + .limit(numOfInputs) + .collect(toImmutableList()); + } + + /** ... */ + static List newOutputs( + @NonNull final OpenLineage ol, @NonNull final String namespace) { + return newOutputs(ol, namespace, 4, 1); + } + + /** ... */ + static List newOutputs( + @NonNull final OpenLineage ol, + @NonNull final String namespace, + final int numOfOutputs, + final int numOfFields) { + return Stream.generate( + () -> + ol.newOutputDatasetBuilder() + .namespace(namespace) + .name(newDatasetName().getValue()) + .facets( + ol.newDatasetFacetsBuilder() + .schema(newDatasetSchema(ol, numOfFields)) + .build()) + .build()) + .limit(numOfOutputs) + .collect(toImmutableList()); + } + + /** ... */ + static OpenLineage.SchemaDatasetFacet newDatasetSchema( + @NonNull final OpenLineage ol, final int numOfFields) { + return ol.newSchemaDatasetFacetBuilder().fields(newFields(ol, numOfFields)).build(); + } + + static List newFields( + @NonNull final OpenLineage ol, final int numOfFields) { + return Stream.generate( + () -> + ol.newSchemaDatasetFacetFieldsBuilder() + .name(newFieldName().getValue()) + .type(newFieldType()) + .description(newDescription()) + .build()) + .limit(numOfFields) + .collect(toImmutableList()); + } + + /** ... */ + static ZonedDateTime newNominalTime() { + return newNominalTimeWith(ZoneId.of("America/Los_Angeles")); + } + + /** ... */ + static ZonedDateTime newNominalTimeWith(@NonNull final ZoneId zoneId) { + return Instant.now().atZone(zoneId); + } +} diff --git a/api/src/test/java/marquez/common/models/CommonModelGenerator.java b/api/src/test/java/marquez/common/models/CommonModelGenerator.java index d6f00aaa09..fdd192a9ff 100644 --- a/api/src/test/java/marquez/common/models/CommonModelGenerator.java +++ b/api/src/test/java/marquez/common/models/CommonModelGenerator.java @@ -91,6 +91,10 @@ public static DatasetType newDatasetType() { return DatasetType.values()[newIdWithBound(DatasetType.values().length - 1)]; } + public static DatasetName newPhysicalDatasetName() { + return DatasetName.of("test_physical_dataset" + newId()); + } + public static Field newField() { return new Field(newFieldName(), newFieldType(), newTagNames(2), newDescription()); } diff --git a/api/src/test/java/marquez/db/BackfillTestUtils.java b/api/src/test/java/marquez/db/BackfillTestUtils.java index 3fbaaf7fa1..6b396803bb 100644 --- a/api/src/test/java/marquez/db/BackfillTestUtils.java +++ b/api/src/test/java/marquez/db/BackfillTestUtils.java @@ -63,7 +63,6 @@ public static RunRow writeNewEvent( runArgsRow.getUuid(), now, now, - namespace.getUuid(), namespace.getName(), jobName, null); diff --git a/api/src/test/java/marquez/db/DbRetentionTest.java b/api/src/test/java/marquez/db/DbRetentionTest.java new file mode 100644 index 0000000000..4c6beb1799 --- /dev/null +++ b/api/src/test/java/marquez/db/DbRetentionTest.java @@ -0,0 +1,687 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db; + +import static java.time.temporal.ChronoUnit.DAYS; +import static marquez.api.models.ApiModelGenerator.newRunEvents; +import static marquez.common.models.CommonModelGenerator.newJobName; +import static marquez.common.models.CommonModelGenerator.newNamespaceName; +import static marquez.db.models.DbModelGenerator.newDatasetRowWith; +import static marquez.db.models.DbModelGenerator.newDatasetRowsWith; +import static marquez.db.models.DbModelGenerator.newDatasetVersionRowWith; +import static marquez.db.models.DbModelGenerator.newDatasetVersionsRowWith; +import static marquez.db.models.DbModelGenerator.newJobRowWith; +import static marquez.db.models.DbModelGenerator.newJobRowsWith; +import static marquez.db.models.DbModelGenerator.newJobVersionRowWith; +import static marquez.db.models.DbModelGenerator.newJobVersionRowsWith; +import static marquez.db.models.DbModelGenerator.newNamespaceRow; +import static marquez.db.models.DbModelGenerator.newRunArgRow; +import static marquez.db.models.DbModelGenerator.newRunRowWith; +import static marquez.db.models.DbModelGenerator.newRunRowsWith; +import static marquez.db.models.DbModelGenerator.newSourceRow; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +import io.openlineage.client.OpenLineage; +import java.net.URI; +import java.time.Instant; +import java.util.Set; +import java.util.UUID; +import marquez.db.exceptions.DbRetentionException; +import marquez.db.models.DatasetRow; +import marquez.db.models.DatasetVersionRow; +import marquez.db.models.JobRow; +import marquez.db.models.JobVersionRow; +import marquez.db.models.NamespaceRow; +import marquez.db.models.RunArgsRow; +import marquez.db.models.RunRow; +import marquez.db.models.SourceRow; +import org.jdbi.v3.core.Handle; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.testcontainers.shaded.com.google.common.collect.ImmutableSet; + +/** The test suite for {@link DbRetention}. */ +@Tag("IntegrationTests") +public class DbRetentionTest extends DbTest { + private static final int NUMBER_OF_ROWS_PER_BATCH = 10; + private static final int RETENTION_DAYS = 30; + private static final boolean DRY_RUN = true; + private static final Instant OLDER_THAN_X_DAYS = Instant.now().minus(RETENTION_DAYS + 1, DAYS); + private static final Instant LAST_X_DAYS = Instant.now().minus(RETENTION_DAYS - 1, DAYS); + + @Test + public void testRetentionOnDbOrErrorWithJobsOlderThanXDays() { + // (1) Add namespace. + final NamespaceRow namespaceRow = DB.upsert(newNamespaceRow()); + + // (2) Add jobs older than X days. + final Set rowsOlderThanXDays = + DB.upsertAll( + newJobRowsWith(OLDER_THAN_X_DAYS, namespaceRow.getUuid(), namespaceRow.getName(), 4)); + + // (3) Add jobs within last X days. + final Set rowsLastXDays = + DB.upsertAll( + newJobRowsWith(LAST_X_DAYS, namespaceRow.getUuid(), namespaceRow.getName(), 2)); + + // (4) Apply retention policy as dry run on jobs older than X days. + try { + DbRetention.retentionOnDbOrError( + jdbiExtension.getJdbi(), NUMBER_OF_ROWS_PER_BATCH, RETENTION_DAYS, DRY_RUN); + // (5) Query 'jobs' table for rows. We want to ensure: jobs older than X days + // have not deleted; jobs within last X days have not been deleted. + try (final Handle handle = DB.open()) { + assertThat(DbTestUtils.rowsExist(handle, rowsOlderThanXDays)).isTrue(); + assertThat(DbTestUtils.rowsExist(handle, rowsLastXDays)).isTrue(); + } + } catch (DbRetentionException e) { + fail("failed to apply dry run", e); + } + + // (6) Apply retention policy on jobs older than X days. + try { + DbRetention.retentionOnDbOrError( + jdbiExtension.getJdbi(), NUMBER_OF_ROWS_PER_BATCH, RETENTION_DAYS); + // (7) Query 'jobs' table for rows deleted. We want to ensure: jobs older than X days + // have been deleted; jobs within last X days have not been deleted. + try (final Handle handle = DB.open()) { + assertThat(DbTestUtils.rowsExist(handle, rowsOlderThanXDays)).isFalse(); + assertThat(DbTestUtils.rowsExist(handle, rowsLastXDays)).isTrue(); + } + } catch (DbRetentionException e) { + fail("failed to apply retention policy", e); + } + } + + @Test + public void testRetentionOnDbOrErrorWithJobVersionsOlderThanXDays() { + // (1) Add namespace and source. + final NamespaceRow namespaceRow = DB.upsert(newNamespaceRow()); + final SourceRow sourceRow = DB.upsert(newSourceRow()); + + // (2) Add dataset (as inputs) associated with job version. + final Set datasetsAsInput = + DB.upsertAll( + newDatasetRowsWith( + namespaceRow.getUuid(), + namespaceRow.getName(), + sourceRow.getUuid(), + sourceRow.getName(), + 2)); + + // (3) Add dataset (as outputs) associated with job version. + final Set datasetsAsOutput = + DB.upsertAll( + newDatasetRowsWith( + namespaceRow.getUuid(), + namespaceRow.getName(), + sourceRow.getUuid(), + sourceRow.getName(), + 4)); + + // (4) Use any output dataset for job versions to obtain namespace and associate with job. + final DatasetRow datasetAsOutput = datasetsAsOutput.stream().findAny().orElseThrow(); + final UUID namespaceUuid = datasetAsOutput.getNamespaceUuid(); + final String namespaceName = datasetAsOutput.getNamespaceName(); + + // (5) Add job. + final JobRow jobRow = DB.upsert(newJobRowWith(namespaceUuid, namespaceName)); + + // (6) Add job versions older than X days associated with job. + final Set rowsOlderThanXDays = + DB.upsertAll( + newJobVersionRowsWith( + OLDER_THAN_X_DAYS, + namespaceUuid, + namespaceName, + jobRow.getUuid(), + jobRow.getName(), + datasetsAsInput, + datasetsAsOutput, + 4)); + + // (7) Add job versions within last X days associated with job. + final Set rowsLastXDays = + DB.upsertAll( + newJobVersionRowsWith( + LAST_X_DAYS, + namespaceUuid, + namespaceName, + jobRow.getUuid(), + jobRow.getName(), + datasetsAsInput, + datasetsAsOutput, + 2)); + + // (8) Apply retention policy as dry run on job versions older than X days. + try { + DbRetention.retentionOnDbOrError( + jdbiExtension.getJdbi(), NUMBER_OF_ROWS_PER_BATCH, RETENTION_DAYS, DRY_RUN); + // (9) Query 'job versions' table for rows. We want to ensure: job versions older + // than X days have not been deleted; job versions within last X days have not been deleted. + try (final Handle handle = DB.open()) { + assertThat(DbTestUtils.rowsExist(handle, rowsOlderThanXDays)).isTrue(); + assertThat(DbTestUtils.rowsExist(handle, rowsLastXDays)).isTrue(); + } + } catch (DbRetentionException e) { + fail("failed to apply dry run", e); + } + + // (10) Apply retention policy on job versions older than X days. + try { + DbRetention.retentionOnDbOrError( + jdbiExtension.getJdbi(), NUMBER_OF_ROWS_PER_BATCH, RETENTION_DAYS); + // (11) Query 'job versions' table for rows deleted. We want to ensure: job versions older + // than X days have been deleted; job versions within last X days have not been deleted. + try (final Handle handle = DB.open()) { + assertThat(DbTestUtils.rowsExist(handle, rowsOlderThanXDays)).isFalse(); + assertThat(DbTestUtils.rowsExist(handle, rowsLastXDays)).isTrue(); + } + } catch (DbRetentionException e) { + fail("failed to apply retention policy", e); + } + } + + @Test + public void testRetentionOnDbOrErrorWithRunsOlderThanXDays() { + // (1) Add namespace and source. + final NamespaceRow namespaceRow = DB.upsert(newNamespaceRow()); + final SourceRow sourceRow = DB.upsert(newSourceRow()); + + // (2) Add dataset (as inputs) associated with job. + final Set datasetsAsInput = + DB.upsertAll( + newDatasetRowsWith( + namespaceRow.getUuid(), + namespaceRow.getName(), + sourceRow.getUuid(), + sourceRow.getName(), + 2)); + + // (3) Add dataset (as outputs) associated with job. + final Set datasetsAsOutput = + DB.upsertAll( + newDatasetRowsWith( + namespaceRow.getUuid(), + namespaceRow.getName(), + sourceRow.getUuid(), + sourceRow.getName(), + 4)); + + // (4) Use any output dataset for run to obtain namespace and associate with job. + final DatasetRow datasetAsOutput = datasetsAsOutput.stream().findAny().orElseThrow(); + final UUID namespaceUuid = datasetAsOutput.getNamespaceUuid(); + final String namespaceName = datasetAsOutput.getNamespaceName(); + + // (5) Add version for job. + final JobRow jobRow = DB.upsert(newJobRowWith(namespaceUuid, namespaceName)); + final JobVersionRow jobVersionRow = + DB.upsert( + newJobVersionRowWith( + namespaceUuid, + namespaceName, + jobRow.getUuid(), + jobRow.getName(), + datasetsAsInput, + datasetsAsOutput)); + + // (6) Add args for run. + final RunArgsRow runArgsRow = DB.upsert(newRunArgRow()); + + // (7) Add runs older than X days. + final Set rowsOlderThanXDays = + DB.upsertAll( + newRunRowsWith( + OLDER_THAN_X_DAYS, + jobRow.getUuid(), + jobVersionRow.getUuid(), + runArgsRow.getUuid(), + 4)); + + // (8) Add runs within last X days. + final Set rowsLastXDays = + DB.upsertAll( + newRunRowsWith( + LAST_X_DAYS, jobRow.getUuid(), jobVersionRow.getUuid(), runArgsRow.getUuid(), 2)); + + // (9) Apply retention policy as dry run on runs older than X days. + try { + DbRetention.retentionOnDbOrError( + jdbiExtension.getJdbi(), NUMBER_OF_ROWS_PER_BATCH, RETENTION_DAYS, DRY_RUN); + // (10) Query 'runs' table for rows. We want to ensure: runs older than X days have not been + // deleted; runs within last X days have not been deleted. + try (final Handle handle = DB.open()) { + assertThat(DbTestUtils.rowsExist(handle, rowsOlderThanXDays)).isTrue(); + assertThat(DbTestUtils.rowsExist(handle, rowsLastXDays)).isTrue(); + } + } catch (DbRetentionException e) { + fail("failed to apply dry run", e); + } + + // (11) Apply retention policy on runs older than X days. + try { + DbRetention.retentionOnDbOrError( + jdbiExtension.getJdbi(), NUMBER_OF_ROWS_PER_BATCH, RETENTION_DAYS); + // (12) Query 'runs' table for rows deleted. We want to ensure: runs older than X days have + // been deleted; runs within last X days have not been deleted. + try (final Handle handle = DB.open()) { + assertThat(DbTestUtils.rowsExist(handle, rowsOlderThanXDays)).isFalse(); + assertThat(DbTestUtils.rowsExist(handle, rowsLastXDays)).isTrue(); + } + } catch (DbRetentionException e) { + fail("failed to apply retention policy", e); + } + } + + @Test + public void testRetentionOnDbOrErrorWithDatasetsOlderThanXDays() { + // (1) Add namespace and source. + final NamespaceRow namespaceRow = DB.upsert(newNamespaceRow()); + final SourceRow sourceRow = DB.upsert(newSourceRow()); + + // (2) Add datasets older than X days. + final Set rowsOlderThanXDays = + DB.upsertAll( + newDatasetRowsWith( + OLDER_THAN_X_DAYS, + namespaceRow.getUuid(), + namespaceRow.getName(), + sourceRow.getUuid(), + sourceRow.getName(), + 4)); + + // (3) Add datasets within last X days. + final Set rowsLastXDays = + DB.upsertAll( + newDatasetRowsWith( + LAST_X_DAYS, + namespaceRow.getUuid(), + namespaceRow.getName(), + sourceRow.getUuid(), + sourceRow.getName(), + 2)); + + // (4) Apply retention policy as dry run on datasets older than X days. + try { + DbRetention.retentionOnDbOrError( + jdbiExtension.getJdbi(), NUMBER_OF_ROWS_PER_BATCH, RETENTION_DAYS, DRY_RUN); + // (5) Query 'datasets' table for rows. We want to ensure: datasets older than X days + // have not been deleted; datasets within last X days have not been deleted. + try (final Handle handle = DB.open()) { + assertThat(DbTestUtils.rowsExist(handle, rowsOlderThanXDays)).isTrue(); + assertThat(DbTestUtils.rowsExist(handle, rowsLastXDays)).isTrue(); + } + } catch (DbRetentionException e) { + fail("failed to apply dry run", e); + } + + // (6) Apply retention policy on datasets older than X days. + try { + DbRetention.retentionOnDbOrError( + jdbiExtension.getJdbi(), NUMBER_OF_ROWS_PER_BATCH, RETENTION_DAYS); + // (7) Query 'datasets' table for rows deleted. We want to ensure: datasets older than X days + // have been deleted; datasets within last X days have not been deleted. + try (final Handle handle = DB.open()) { + assertThat(DbTestUtils.rowsExist(handle, rowsOlderThanXDays)).isFalse(); + assertThat(DbTestUtils.rowsExist(handle, rowsLastXDays)).isTrue(); + } + } catch (DbRetentionException e) { + fail("failed to apply retention policy", e); + } + } + + @Test + public void + testRetentionOnDbOrErrorWithDatasetsOlderThanXDays_skipIfDatasetAsInputOrOutputForJobVersion() { + // (1) Add namespace and source. + final NamespaceRow namespaceRow = DB.upsert(newNamespaceRow()); + final SourceRow sourceRow = DB.upsert(newSourceRow()); + + // (2) Add datasets older than X days not associated with a job version; therefore, datasets + // will be deleted when applying retention policy. + final Set rowsOlderThanXDays = + DB.upsertAll( + newDatasetRowsWith( + OLDER_THAN_X_DAYS, + namespaceRow.getUuid(), + namespaceRow.getName(), + sourceRow.getUuid(), + sourceRow.getName(), + 4)); + + // (3) Add datasets (as inputs) older than X days associated with a job version; therefore, + // datasets will be skipped when applying retention policy. + final Set rowsOlderThanXDaysAsInput = + DB.upsertAll( + newDatasetRowsWith( + OLDER_THAN_X_DAYS, + namespaceRow.getUuid(), + namespaceRow.getName(), + sourceRow.getUuid(), + sourceRow.getName(), + 2)); + + // (4) Add datasets (as outputs) within last X days associated with a job version; therefore, + // datasets will be skipped when applying retention policy. + final Set rowsLastXDaysAsOutput = + DB.upsertAll( + newDatasetRowsWith( + LAST_X_DAYS, + namespaceRow.getUuid(), + namespaceRow.getName(), + sourceRow.getUuid(), + sourceRow.getName(), + 4)); + + // (5) Use any output dataset to obtain namespace and associate with job. + final DatasetRow rowLastXDaysAsOutput = rowsLastXDaysAsOutput.stream().findAny().orElseThrow(); + final UUID namespaceUuid = rowLastXDaysAsOutput.getNamespaceUuid(); + final String namespaceName = rowLastXDaysAsOutput.getNamespaceName(); + + // (6) Add job and associate with job version; the job version will have input and output + // datasets older than X days and within last X days, respectively. + final JobRow jobRow = DB.upsert(newJobRowWith(namespaceUuid, namespaceName)); + DB.upsert( + newJobVersionRowWith( + namespaceUuid, + namespaceName, + jobRow.getUuid(), + jobRow.getName(), + rowsOlderThanXDaysAsInput, + rowsLastXDaysAsOutput)); + + // (7) Apply retention policy on datasets older than X days. + try { + DbRetention.retentionOnDbOrError( + jdbiExtension.getJdbi(), NUMBER_OF_ROWS_PER_BATCH, RETENTION_DAYS); + // (8) Query 'datasets' table for rows deleted. We want to ensure: datasets older than X days + // not associated with a job version have been deleted; datasets older than X days associated + // with a job version have not been deleted; datasets within last X days associated with a job + // version have not been deleted. + try (final Handle handle = DB.open()) { + assertThat(DbTestUtils.rowsExist(handle, rowsOlderThanXDays)).isFalse(); + assertThat(DbTestUtils.rowsExist(handle, rowsOlderThanXDaysAsInput)).isTrue(); + assertThat(DbTestUtils.rowsExist(handle, rowsLastXDaysAsOutput)).isTrue(); + } + } catch (DbRetentionException e) { + fail("failed to apply retention policy", e); + } + } + + @Test + public void testRetentionOnDbOrErrorWithDatasetVersionsOlderThanXDays() { + // (1) Add namespace and source. + final NamespaceRow namespaceRow = DB.upsert(newNamespaceRow()); + final SourceRow sourceRow = DB.upsert(newSourceRow()); + + // (2) Add dataset. + final DatasetRow datasetRow = + DB.upsert( + newDatasetRowWith( + namespaceRow.getUuid(), + namespaceRow.getName(), + sourceRow.getUuid(), + sourceRow.getName())); + + // (3) Add versions for dataset older than X days. + final Set rowsOlderThanXDays = + DB.upsertAll( + newDatasetVersionsRowWith( + OLDER_THAN_X_DAYS, + datasetRow.getUuid(), + datasetRow.getName(), + datasetRow.getNamespaceName(), + 4)); + + // (4) Add versions for dataset within last X days. + final Set rowsLastXDays = + DB.upsertAll( + newDatasetVersionsRowWith( + LAST_X_DAYS, + datasetRow.getUuid(), + datasetRow.getName(), + datasetRow.getNamespaceName(), + 2)); + + // (5) Apply retention policy on dataset versions older than X days. + try { + DbRetention.retentionOnDbOrError( + jdbiExtension.getJdbi(), NUMBER_OF_ROWS_PER_BATCH, RETENTION_DAYS); + // (6) Query 'dataset versions' table for rows deleted. We want to ensure: dataset versions + // older than X days have been deleted; datasets within last X days have not been deleted. + try (final Handle handle = DB.open()) { + assertThat(DbTestUtils.rowsExist(handle, rowsOlderThanXDays)).isFalse(); + assertThat(DbTestUtils.rowsExist(handle, rowsLastXDays)).isTrue(); + } + } catch (DbRetentionException e) { + fail("failed to apply retention policy", e); + } + } + + @Test + public void + testRetentionOnDbOrErrorWithDatasetVersionsOlderThanXDays_skipIfVersionAsCurrentForDataset() { + // (1) Add namespace and source. + final NamespaceRow namespaceRow = DB.upsert(newNamespaceRow()); + final SourceRow sourceRow = DB.upsert(newSourceRow()); + final DatasetRow datasetRow = + DB.upsert( + newDatasetRowWith( + namespaceRow.getUuid(), + namespaceRow.getName(), + sourceRow.getUuid(), + sourceRow.getName())); + + // (2) Add dataset versions older than X days. + final Set rowsOlderThanXDays = + DB.upsertAll( + newDatasetVersionsRowWith( + OLDER_THAN_X_DAYS, + datasetRow.getUuid(), + datasetRow.getName(), + datasetRow.getNamespaceName(), + 4)); + + // (3) Add dataset versions within last X days. + final Set rowsLastXDays = + DB.upsertAll( + newDatasetVersionsRowWith( + LAST_X_DAYS, + datasetRow.getUuid(), + datasetRow.getName(), + datasetRow.getNamespaceName(), + 2)); + + // (4) Add dataset version older than X days associated with dataset (as current version); + // therefore, the dataset version will be skipped when applying retention policy. + final DatasetVersionRow rowOlderThanXDaysAsCurrent = + DB.upsert( + newDatasetVersionRowWith( + LAST_X_DAYS, + datasetRow.getUuid(), + datasetRow.getName(), + datasetRow.getNamespaceName()), + true); + + // (5) Apply retention policy on dataset versions older than X days. + try { + DbRetention.retentionOnDbOrError( + jdbiExtension.getJdbi(), NUMBER_OF_ROWS_PER_BATCH, RETENTION_DAYS); + // (6) Query 'dataset versions' table for rows deleted. We want to ensure: dataset versions + // older than X days have been deleted; dataset versions within last X days have not been + // deleted; dataset versions older than X days associated with a dataset (as current version) + // has not been deleted. + try (final Handle handle = DB.open()) { + assertThat(DbTestUtils.rowsExist(handle, rowsOlderThanXDays)).isFalse(); + assertThat(DbTestUtils.rowsExist(handle, rowsLastXDays)).isTrue(); + assertThat(DbTestUtils.rowExists(handle, rowOlderThanXDaysAsCurrent)).isTrue(); + } + } catch (DbRetentionException e) { + fail("failed to apply retention policy", e); + } + } + + @Test + public void + testRetentionOnDbOrErrorWithDatasetVersionsOlderThanXDays_skipIfVersionAsInputForRun() { + // (1) Add namespace and source. + final NamespaceRow namespaceRow = DB.upsert(newNamespaceRow()); + final SourceRow sourceRow = DB.upsert(newSourceRow()); + + // (2) Add dataset (as inputs) associated with job. + final Set datasetsAsInput = + DB.upsertAll( + newDatasetRowsWith( + namespaceRow.getUuid(), + namespaceRow.getName(), + sourceRow.getUuid(), + sourceRow.getName(), + 2)); + + // (3) Add dataset (as outputs) associated with job. + final Set datasetsAsOutput = + DB.upsertAll( + newDatasetRowsWith( + namespaceRow.getUuid(), + namespaceRow.getName(), + sourceRow.getUuid(), + sourceRow.getName(), + 4)); + + // (4) Add dataset versions older than X days for each input datasets associated with run. + final ImmutableSet.Builder builderRowsOlderThanXDaysAsInput = + ImmutableSet.builder(); + for (final DatasetRow rowAsInput : datasetsAsInput) { + builderRowsOlderThanXDaysAsInput.addAll( + DB.upsertAll( + newDatasetVersionsRowWith( + OLDER_THAN_X_DAYS, + rowAsInput.getUuid(), + rowAsInput.getName(), + rowAsInput.getNamespaceName(), + 4))); + } + final Set rowsOlderThanXDaysAsInput = + builderRowsOlderThanXDaysAsInput.build(); + + // (5) Add dataset versions within last X days for each output datasets associated with run. + final ImmutableSet.Builder builderRowsLastXDaysAsOutput = + ImmutableSet.builder(); + for (final DatasetRow rowAsOutput : datasetsAsOutput) { + builderRowsLastXDaysAsOutput.addAll( + DB.upsertAll( + newDatasetVersionsRowWith( + LAST_X_DAYS, + rowAsOutput.getUuid(), + rowAsOutput.getName(), + rowAsOutput.getNamespaceName(), + 2))); + } + final Set rowsLastXDaysAsOutput = builderRowsLastXDaysAsOutput.build(); + + // (6) Use any output dataset for run to obtain namespace and associate with job. + final DatasetRow datasetAsOutput = datasetsAsOutput.stream().findAny().orElseThrow(); + final UUID namespaceUuid = datasetAsOutput.getNamespaceUuid(); + final String namespaceName = datasetAsOutput.getNamespaceName(); + + // (7) Add version for job. + final JobRow jobRow = DB.upsert(newJobRowWith(namespaceUuid, namespaceName)); + final JobVersionRow jobVersionRow = + DB.upsert( + newJobVersionRowWith( + namespaceUuid, + namespaceName, + jobRow.getUuid(), + jobRow.getName(), + datasetsAsInput, + datasetsAsOutput)); + + // (8) Add run and associate with job and version. + final RunArgsRow runArgsRow = DB.upsert(newRunArgRow()); + final RunRow runRow = + newRunRowWith(jobRow.getUuid(), jobVersionRow.getUuid(), runArgsRow.getUuid()); + + // (9) Add dataset version (as input) older than X days associated with run; + // therefore, the dataset version will be skipped when applying retention policy. + final DatasetRow datasetAsInput = datasetsAsInput.stream().findAny().orElseThrow(); + final DatasetVersionRow rowOlderThanXDaysAsInput = + DB.upsert( + newDatasetVersionRowWith( + LAST_X_DAYS, + datasetAsInput.getUuid(), + datasetAsInput.getName(), + datasetAsInput.getNamespaceName(), + runRow.getUuid())); + DB.upsertWith(runRow, rowOlderThanXDaysAsInput.getUuid()); + + // (10) Apply retention policy on dataset versions older than X days. + try { + DbRetention.retentionOnDbOrError( + jdbiExtension.getJdbi(), NUMBER_OF_ROWS_PER_BATCH, RETENTION_DAYS); + // (11) Query 'dataset versions' table for rows deleted. We want to ensure: dataset versions + // older than X days associated with a run (as input) has not been deleted; dataset versions + // older than X days have been deleted; dataset versions within last X days have not been + // deleted. + try (final Handle handle = DB.open()) { + assertThat(DbTestUtils.rowExists(handle, rowOlderThanXDaysAsInput)).isTrue(); + assertThat(DbTestUtils.rowsExist(handle, rowsOlderThanXDaysAsInput)).isFalse(); + assertThat(DbTestUtils.rowsExist(handle, rowsLastXDaysAsOutput)).isTrue(); + } + } catch (DbRetentionException e) { + fail("failed to apply retention policy", e); + } + } + + @Test + public void testRetentionOnDbOrErrorWithOlEventsOlderThanXDays() { + // (1) Configure OL. + final URI olProducer = URI.create("https://test.com/test"); + final OpenLineage ol = new OpenLineage(olProducer); + + // (2) Add namespace and job for OL events. + final String namespaceName = newNamespaceName().getValue(); + final String jobName = newJobName().getValue(); + + // (3) Add OL events older than X days. + final Set olEventsOlderThanXDays = + newRunEvents(ol, OLDER_THAN_X_DAYS, namespaceName, jobName, 4); + DB.insertAll(olEventsOlderThanXDays); + + // (4) Add OL events within last X days. + final Set olEventsLastXDays = + newRunEvents(ol, LAST_X_DAYS, namespaceName, jobName, 2); + DB.insertAll(olEventsLastXDays); + + // (5) Apply retention policy as dry run on OL events older than X days. + try { + DbRetention.retentionOnDbOrError( + jdbiExtension.getJdbi(), NUMBER_OF_ROWS_PER_BATCH, RETENTION_DAYS, DRY_RUN); + // (6) Query 'lineage events' table for events. We want to ensure: OL events older than X + // days have not been deleted; OL events within last X days have not been deleted. + try (final Handle handle = DB.open()) { + assertThat(DbTestUtils.olEventsExist(handle, olEventsOlderThanXDays)).isTrue(); + assertThat(DbTestUtils.olEventsExist(handle, olEventsLastXDays)).isTrue(); + } + } catch (DbRetentionException e) { + fail("failed to apply dry run", e); + } + + // (7) Apply retention policy on OL events older than X days. + try { + DbRetention.retentionOnDbOrError( + jdbiExtension.getJdbi(), NUMBER_OF_ROWS_PER_BATCH, RETENTION_DAYS); + // (8) Query 'lineage events' table for events deleted. We want to ensure: OL events older + // than X days have been deleted; OL events within last X days have not been deleted. + try (final Handle handle = DB.open()) { + assertThat(DbTestUtils.olEventsExist(handle, olEventsOlderThanXDays)).isFalse(); + assertThat(DbTestUtils.olEventsExist(handle, olEventsLastXDays)).isTrue(); + } + } catch (DbRetentionException e) { + fail("failed to apply retention policy", e); + } + } +} diff --git a/api/src/test/java/marquez/db/DbTest.java b/api/src/test/java/marquez/db/DbTest.java new file mode 100644 index 0000000000..15ec7e2c31 --- /dev/null +++ b/api/src/test/java/marquez/db/DbTest.java @@ -0,0 +1,63 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db; + +import javax.sql.DataSource; +import org.jdbi.v3.jackson2.Jackson2Plugin; +import org.jdbi.v3.postgres.PostgresPlugin; +import org.jdbi.v3.sqlobject.SqlObjectPlugin; +import org.jdbi.v3.testing.junit5.JdbiExtension; +import org.jdbi.v3.testing.junit5.tc.JdbiTestcontainersExtension; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +/** + * The base class for interactions with test database. A {@code postgres} container is managed + * automatically and started only once for a given test suite. The {@code postgres} container will + * be shared between test methods. + * + *

After the underlying {@code postgres} container starts, but before a given test suite is + * executed, the latest {@code flyway} migrations for Marquez will be applied to the database using + * {@link DbMigration#migrateDbOrError(DataSource)}. When querying the test database, we recommend + * using the {@code DB} wrapper, but you can also obtain a {@code jdbi} instance directly via {@link + * JdbiExtension#getJdbi()}}. + */ +@Tag("DataAccessTests") +@Testcontainers +class DbTest { + private static final DockerImageName POSTGRES_12_1 = DockerImageName.parse("postgres:12.1"); + + @Container + private static final PostgreSQLContainer DB_CONTAINER = + new PostgreSQLContainer<>(POSTGRES_12_1); + + // Defined statically to significantly improve overall test execution. + @RegisterExtension + static final JdbiExtension jdbiExtension = + JdbiTestcontainersExtension.instance(DB_CONTAINER) + .withPlugin(new SqlObjectPlugin()) + .withPlugin(new PostgresPlugin()) + .withPlugin(new Jackson2Plugin()) + .withInitializer( + (source, handle) -> { + // Apply migrations. + DbMigration.migrateDbOrError(source); + }); + + // Wraps test database connection. + static TestingDb DB; + + @BeforeAll + public static void setUpOnce() { + // Wrap jdbi configured for running container. + DB = TestingDb.newInstance(jdbiExtension.getJdbi()); + } +} diff --git a/api/src/test/java/marquez/db/DbTestUtils.java b/api/src/test/java/marquez/db/DbTestUtils.java index 01ede91b58..31cf24fa78 100644 --- a/api/src/test/java/marquez/db/DbTestUtils.java +++ b/api/src/test/java/marquez/db/DbTestUtils.java @@ -22,15 +22,18 @@ import com.google.common.base.Predicates; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.openlineage.client.OpenLineage; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; +import lombok.NonNull; import marquez.common.Utils; import marquez.common.models.DatasetId; import marquez.common.models.DatasetName; @@ -39,6 +42,7 @@ import marquez.common.models.NamespaceName; import marquez.common.models.RunState; import marquez.db.models.DatasetRow; +import marquez.db.models.DatasetVersionRow; import marquez.db.models.ExtendedJobVersionRow; import marquez.db.models.JobRow; import marquez.db.models.JobVersionRow; @@ -52,6 +56,7 @@ import marquez.service.models.Run; import marquez.service.models.RunMeta; import marquez.service.models.ServiceModelGenerator; +import org.jdbi.v3.core.Handle; import org.jdbi.v3.core.Jdbi; /** Static utility methods for inserting and interacting with rows in the database. */ @@ -248,7 +253,6 @@ static RunRow newRun( runArgsUuid, newTimestamp(), newTimestamp(), - namespaceUuid, namespaceName, jobName, jobLocation); @@ -326,4 +330,87 @@ public static Stream> streamResults(ResultSet resultSet) { }) .takeWhile(Predicates.notNull()); } + + public static boolean rowExists(@NonNull final Handle handle, final @NonNull T rowToVerify) { + return rowsExist(handle, ImmutableSet.of(rowToVerify)); + } + + /** Returns {@code true} ... */ + public static boolean rowsExist( + @NonNull final Handle handle, final @NonNull Set rowsToVerify) { + // TODO (wslulciuc): Add interface for rows to allow for Row.getUuid() + if (rowsToVerify.stream().anyMatch(DatasetRow.class::isInstance)) { + return rowsArePresentIn( + handle, + "datasets", + rowsToVerify.stream() + .map(DatasetRow.class::cast) + .map(DatasetRow::getUuid) + .collect(toImmutableSet())); + } else if (rowsToVerify.stream().anyMatch(DatasetVersionRow.class::isInstance)) { + return rowsArePresentIn( + handle, + "dataset_versions", + rowsToVerify.stream() + .map(DatasetVersionRow.class::cast) + .map(DatasetVersionRow::getUuid) + .collect(toImmutableSet())); + } else if (rowsToVerify.stream().anyMatch(JobRow.class::isInstance)) { + return rowsArePresentIn( + handle, + "jobs", + rowsToVerify.stream() + .map(JobRow.class::cast) + .map(JobRow::getUuid) + .collect(toImmutableSet())); + } else if (rowsToVerify.stream().anyMatch(JobVersionRow.class::isInstance)) { + return rowsArePresentIn( + handle, + "job_versions", + rowsToVerify.stream() + .map(JobVersionRow.class::cast) + .map(JobVersionRow::getUuid) + .collect(toImmutableSet())); + } else if (rowsToVerify.stream().anyMatch(RunRow.class::isInstance)) { + return rowsArePresentIn( + handle, + "runs", + rowsToVerify.stream() + .map(RunRow.class::cast) + .map(RunRow::getUuid) + .collect(toImmutableSet())); + } + throw new IllegalArgumentException(); + } + + /** Returns {@code true} ... */ + private static boolean rowsArePresentIn( + @NonNull final Handle handle, + @NonNull final String uuidsForRowsExistsInTable, + @NonNull final Set uuidsForRowsToVerify) { + return handle + .createQuery( + "SELECT EXISTS (SELECT 1 FROM " + + uuidsForRowsExistsInTable + + " WHERE uuid IN ())") + .bindList("uuidsForRowsToVerify", uuidsForRowsToVerify) + .mapTo(Boolean.class) + .one(); + } + + /** Returns {@code true} ... */ + public static boolean olEventsExist( + @NonNull final Handle handle, @NonNull final Set olEventsToVerify) { + final Set runUuidsToVerify = + olEventsToVerify.stream() + .map(OpenLineage.RunEvent::getRun) + .map(OpenLineage.Run::getRunId) + .collect(toImmutableSet()); + return handle + .createQuery( + "SELECT EXISTS (SELECT 1 FROM lineage_events WHERE run_uuid IN ())") + .bindList("runUuidsToVerify", runUuidsToVerify) + .mapTo(Boolean.class) + .one(); + } } diff --git a/api/src/test/java/marquez/db/RunDaoTest.java b/api/src/test/java/marquez/db/RunDaoTest.java index 102c2c63ce..9eb1fc2bcd 100644 --- a/api/src/test/java/marquez/db/RunDaoTest.java +++ b/api/src/test/java/marquez/db/RunDaoTest.java @@ -224,7 +224,6 @@ public void updateRowWithNullNominalTimeDoesNotUpdateNominalTime() { row.getRunArgsUuid(), null, null, - namespaceRow.getUuid(), namespaceRow.getName(), jobRow.getName(), null); @@ -256,7 +255,6 @@ public void updateRowWithExternalId() { row.getRunArgsUuid(), null, null, - namespaceRow.getUuid(), namespaceRow.getName(), jobRow.getName(), null); @@ -271,7 +269,6 @@ public void updateRowWithExternalId() { row.getRunArgsUuid(), null, null, - namespaceRow.getUuid(), namespaceRow.getName(), jobRow.getName(), null); diff --git a/api/src/test/java/marquez/db/TestingDb.java b/api/src/test/java/marquez/db/TestingDb.java new file mode 100644 index 0000000000..0ccb2af65f --- /dev/null +++ b/api/src/test/java/marquez/db/TestingDb.java @@ -0,0 +1,246 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db; + +import static marquez.common.models.CommonModelGenerator.newFields; + +import com.google.common.collect.ImmutableSet; +import io.openlineage.client.OpenLineage; +import java.time.Instant; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import javax.annotation.Nullable; +import lombok.NonNull; +import marquez.common.models.DatasetType; +import marquez.common.models.JobType; +import marquez.db.models.DatasetRow; +import marquez.db.models.DatasetVersionRow; +import marquez.db.models.JobRow; +import marquez.db.models.JobVersionRow; +import marquez.db.models.NamespaceRow; +import marquez.db.models.RunArgsRow; +import marquez.db.models.RunRow; +import marquez.db.models.SourceRow; +import org.jdbi.v3.core.Handle; +import org.jdbi.v3.core.Jdbi; + +/** + * Forwarding wrapper around an open connection to test database; utility methods {@code upsert()} + * and {@code upsertAll()} implementations are intended for use in testing only. When querying the + * test database (or opening a new {@link Handle}), interactions are delegated to underlying {@code + * jdbi}. + */ +final class TestingDb { + private final Jdbi delegate; + + private static final Instant NOW = Instant.now(); + + private TestingDb(@NonNull final Jdbi delegate) { + this.delegate = delegate; + } + + /** Returns a new {@code TestingDb} object with the specified {@code jdbi} for delegation. */ + static TestingDb newInstance(@NonNull final Jdbi delegate) { + return new TestingDb(delegate); + } + + /** Execute {@code UPSERT} for the specified {@link NamespaceRow} object. */ + NamespaceRow upsert(@NonNull NamespaceRow row) { + return delegate + .onDemand(NamespaceDao.class) + .upsertNamespaceRow( + row.getUuid(), + row.getCreatedAt(), + row.getName(), + row.getCurrentOwnerName(), + row.getDescription().orElse(null)); + } + + /** Execute {@code UPSERT} for the specified {@link SourceRow} object. */ + SourceRow upsert(@NonNull SourceRow row) { + return delegate + .onDemand(SourceDao.class) + .upsert( + row.getUuid(), + row.getType(), + row.getCreatedAt(), + row.getName(), + row.getConnectionUrl()); + } + + Set upsertAll(@NonNull Object... rows) { + return upsertAll(rows); + } + + /** ... */ + Set upsertAll(@NonNull Set rows) { + ImmutableSet.Builder upserted = new ImmutableSet.Builder<>(); + rows.forEach( + row -> { + upserted.add((T) upsert(row)); + }); + return upserted.build(); + } + + private Object upsert(Object row) { + if (row instanceof DatasetRow) { + return upsert((DatasetRow) row); + } else if (row instanceof DatasetVersionRow) { + return upsert((DatasetVersionRow) row); + } else if (row instanceof JobRow) { + return upsert((JobRow) row); + } else if (row instanceof JobVersionRow) { + return upsert((JobVersionRow) row); + } else if (row instanceof RunRow) { + return upsert((RunRow) row); + } + throw new IllegalArgumentException(); + } + + /** Execute {@code UPSERT} for the specified {@link DatasetRow} object. */ + DatasetRow upsert(@NonNull DatasetRow row) { + return delegate + .onDemand(DatasetDao.class) + .upsert( + row.getUuid(), + DatasetType.valueOf(row.getType()), + row.getCreatedAt(), + row.getNamespaceUuid(), + row.getNamespaceName(), + row.getSourceUuid(), + row.getSourceName(), + row.getName(), + row.getPhysicalName(), + row.getDescription().orElse(null), + row.isDeleted()); + } + + /** Execute {@code UPSERT} for the specified {@link DatasetRow} object. */ + DatasetVersionRow upsert(@NonNull DatasetVersionRow row) { + return upsert(row, false); + } + + /** Execute {@code UPSERT} for the specified {@link DatasetRow} object. */ + DatasetVersionRow upsert(@NonNull DatasetVersionRow row, boolean isCurrentVersion) { + final DatasetVersionRow upserted = + delegate + .onDemand(DatasetVersionDao.class) + .upsert( + row.getUuid(), + row.getCreatedAt(), + row.getDatasetUuid(), + row.getVersion(), + row.getRunUuid().orElseThrow(), + Columns.toPgObject(newFields(4)), + row.getNamespaceName(), + row.getDatasetName(), + row.getLifecycleState()); + + // ... + if (isCurrentVersion) { + delegate + .onDemand(DatasetDao.class) + .updateVersion(row.getDatasetUuid(), row.getCreatedAt(), row.getVersion()); + } + return upserted; + } + + /** Execute {@code UPSERT} for the specified {@link RunArgsRow} object. */ + RunArgsRow upsert(@NonNull RunArgsRow row) { + return delegate + .onDemand(RunArgsDao.class) + .upsertRunArgs(row.getUuid(), row.getCreatedAt(), row.getArgs(), row.getChecksum()); + } + + /** Execute {@code UPSERT} for the specified {@link JobRow} object. */ + JobRow upsert(@NonNull JobRow row) { + return delegate + .onDemand(JobDao.class) + .upsertJob( + row.getUuid(), + JobType.valueOf(row.getType()), + row.getCreatedAt(), + row.getNamespaceUuid(), + row.getNamespaceName(), + row.getName(), + row.getDescription().orElse(null), + row.getLocation(), + null, + null); + } + + /** Execute {@code UPSERT} for the specified {@link JobVersionRow} object. */ + JobVersionRow upsert(@NonNull JobVersionRow row) { + final JobVersionDao dao = delegate.onDemand(JobVersionDao.class); + final JobVersionRow upserted = + dao.upsertJobVersion( + row.getUuid(), + row.getCreatedAt(), + row.getJobUuid(), + row.getLocation().orElse(null), + row.getVersion(), + row.getJobName(), + row.getNamespaceUuid(), + row.getNamespaceName()); + row.getInputUuids().forEach(in -> dao.upsertInputDatasetFor(row.getUuid(), in)); + row.getInputUuids().forEach(out -> dao.upsertInputDatasetFor(row.getUuid(), out)); + // ... + delegate.onDemand(JobDao.class).updateVersionFor(row.getJobUuid(), NOW, upserted.getUuid()); + return upserted; + } + + RunRow upsert(@NonNull RunRow row) { + return upsertWith(row, null); + } + + RunRow upsertWith(@NonNull RunRow row, @Nullable final UUID datasetVersionUuidAsInput) { + final RunDao dao = delegate.onDemand(RunDao.class); + final RunRow upserted = + dao.upsert( + row.getUuid(), + row.getParentRunUuid().orElse(null), + null, // ... + row.getCreatedAt(), + row.getJobUuid(), + row.getJobVersionUuid().orElse(null), + row.getRunArgsUuid(), + row.getNominalStartTime().orElse(null), + row.getNominalEndTime().orElse(null), + row.getNamespaceName(), + row.getJobName(), + null // ... + ); + // ... + Optional.ofNullable(datasetVersionUuidAsInput) + .ifPresent(uuidOfInput -> dao.updateInputMapping(row.getUuid(), uuidOfInput)); + return upserted; + } + + void insertAll(@NonNull Set olEvents) { + for (final OpenLineage.RunEvent olEvent : olEvents) { + insert(olEvent); + } + } + + void insert(@NonNull OpenLineage.RunEvent olEvent) { + delegate + .onDemand(OpenLineageDao.class) + .createLineageEvent( + olEvent.getEventType().toString(), + olEvent.getEventTime().toInstant(), + olEvent.getRun().getRunId(), + olEvent.getJob().getName(), + olEvent.getJob().getNamespace(), + Columns.toPgObject(olEvent), + olEvent.getProducer().toASCIIString()); + } + + /** Obtain a new {@link Handle} by delegating to underlying {@code jdbi}. */ + Handle open() { + return delegate.open(); + } +} diff --git a/api/src/test/java/marquez/db/models/DbModelGenerator.java b/api/src/test/java/marquez/db/models/DbModelGenerator.java index bb1f815847..fb16124f77 100644 --- a/api/src/test/java/marquez/db/models/DbModelGenerator.java +++ b/api/src/test/java/marquez/db/models/DbModelGenerator.java @@ -6,15 +6,32 @@ package marquez.db.models; import static com.google.common.collect.ImmutableList.toImmutableList; +import static graphql.com.google.common.collect.ImmutableSet.toImmutableSet; +import static marquez.common.models.CommonModelGenerator.newConnectionUrl; +import static marquez.common.models.CommonModelGenerator.newDatasetName; +import static marquez.common.models.CommonModelGenerator.newDatasetType; +import static marquez.common.models.CommonModelGenerator.newDbSourceType; import static marquez.common.models.CommonModelGenerator.newDescription; +import static marquez.common.models.CommonModelGenerator.newJobName; +import static marquez.common.models.CommonModelGenerator.newJobType; +import static marquez.common.models.CommonModelGenerator.newLifecycleState; +import static marquez.common.models.CommonModelGenerator.newLocation; import static marquez.common.models.CommonModelGenerator.newNamespaceName; import static marquez.common.models.CommonModelGenerator.newOwnerName; +import static marquez.common.models.CommonModelGenerator.newPhysicalDatasetName; +import static marquez.common.models.CommonModelGenerator.newRunId; +import static marquez.common.models.CommonModelGenerator.newSourceName; +import static marquez.common.models.CommonModelGenerator.newVersion; +import static marquez.service.models.ServiceModelGenerator.newRunArgs; import java.time.Instant; -import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.stream.Stream; +import lombok.NonNull; import marquez.Generator; +import marquez.common.Utils; import marquez.common.models.JobId; import marquez.common.models.JobName; import marquez.common.models.NamespaceName; @@ -24,26 +41,297 @@ public final class DbModelGenerator extends Generator { private DbModelGenerator() {} + private static final Instant NOW = Instant.now(); + /** Returns new {@link NamespaceRow} objects with a specified {@code limit}. */ - public static List newNamespaceRows(int limit) { + public static Set newNamespaceRows(final int limit) { return Stream.generate(DbModelGenerator::newNamespaceRow) .limit(limit) - .collect(toImmutableList()); + .collect(toImmutableSet()); } - /** Returns a new {@link NamespaceRow} object. */ + /** Returns a new {@link NamespaceRow} object with defaults. */ public static NamespaceRow newNamespaceRow() { - final Instant now = newTimestamp(); return new NamespaceRow( newRowUuid(), - now, - now, + NOW, + NOW, newNamespaceName().getValue(), newDescription(), newOwnerName().getValue(), false); } + /** Returns a new {@link SourceRow} object with defaults. */ + public static SourceRow newSourceRow() { + return new SourceRow( + newRowUuid(), + newDbSourceType().getValue(), + NOW, + NOW, + newSourceName().getValue(), + newConnectionUrl().toASCIIString(), + newDescription()); + } + + public static DatasetRow newDatasetRow() { + final NamespaceRow namespaceRow = newNamespaceRow(); + final SourceRow sourceRow = newSourceRow(); + return new DatasetRow( + newRowUuid(), + newDatasetType().name(), + NOW, + NOW, + namespaceRow.getUuid(), + namespaceRow.getName(), + sourceRow.getUuid(), + sourceRow.getName(), + newDatasetName().getValue(), + newPhysicalDatasetName().getValue(), + null, + newDescription(), + null, + false); + } + + public static Set newDatasetRowsWith( + @NonNull final UUID namespaceUuid, + @NonNull final String namespaceName, + @NonNull final UUID sourceUuid, + @NonNull final String sourceName, + final int limit) { + return Stream.generate( + () -> newDatasetRowWith(NOW, namespaceUuid, namespaceName, sourceUuid, sourceName)) + .limit(limit) + .collect(toImmutableSet()); + } + + public static Set newDatasetRowsWith( + @NotNull final Instant now, + @NonNull final UUID namespaceUuid, + @NonNull final String namespaceName, + @NonNull final UUID sourceUuid, + @NonNull final String sourceName, + final int limit) { + return Stream.generate( + () -> newDatasetRowWith(now, namespaceUuid, namespaceName, sourceUuid, sourceName)) + .limit(limit) + .collect(toImmutableSet()); + } + + public static DatasetRow newDatasetRowWith( + @NonNull final UUID namespaceUuid, + @NonNull final String namespaceName, + @NonNull final UUID sourceUuid, + @NonNull final String sourceName) { + return newDatasetRowWith(NOW, namespaceUuid, namespaceName, sourceUuid, sourceName); + } + + public static DatasetRow newDatasetRowWith( + @NotNull final Instant now, + @NonNull final UUID namespaceUuid, + @NonNull final String namespaceName, + @NonNull final UUID sourceUuid, + @NonNull final String sourceName) { + return new DatasetRow( + newRowUuid(), + newDatasetType().name(), + now, + now, + namespaceUuid, + namespaceName, + sourceUuid, + sourceName, + newDatasetName().getValue(), + newPhysicalDatasetName().getValue(), + null, + newDescription(), + null, + false); + } + + /** Returns new {@link DatasetVersionRow} objects with a specified {@code limit}. */ + public static Set newDatasetVersionsRowWith( + @NotNull final Instant now, + @NonNull final UUID datasetUuid, + @NonNull final String datasetName, + @NonNull final String namespaceName, + final int limit) { + return Stream.generate( + () -> newDatasetVersionRowWith(now, datasetUuid, datasetName, namespaceName)) + .limit(limit) + .collect(toImmutableSet()); + } + + public static DatasetVersionRow newDatasetVersionRowWith( + @NotNull final Instant now, + @NonNull final UUID datasetUuid, + @NonNull final String datasetName, + @NonNull final String namespaceName) { + // the run row ... + return newDatasetVersionRowWith( + now, datasetUuid, datasetName, namespaceName, newRunId().getValue()); + } + + public static DatasetVersionRow newDatasetVersionRowWith( + @NotNull final Instant now, + @NonNull final UUID datasetUuid, + @NonNull final String datasetName, + @NonNull final String namespaceName, + @NonNull final UUID runUuid) { + return new DatasetVersionRow( + newRowUuid(), + now, + datasetUuid, + newVersion().getValue(), + newLifecycleState(), + runUuid, + datasetName, + namespaceName); + } + + /** Returns new {@link JobRow} objects with a specified {@code limit}. */ + public static Set newJobRowsWith( + @NotNull final Instant now, + @NonNull final UUID namespaceUuid, + @NonNull final String namespaceName, + final int limit) { + return Stream.generate(() -> newJobRowWith(now, namespaceUuid, namespaceName)) + .limit(limit) + .collect(toImmutableSet()); + } + + public static JobRow newJobRowWith( + @NonNull final UUID namespaceUuid, @NonNull final String namespaceName) { + return newJobRowWith(NOW, namespaceUuid, namespaceName); + } + + public static JobRow newJobRowWith( + @NotNull final Instant now, + @NonNull final UUID namespaceUuid, + @NonNull final String namespaceName) { + final String parentJobName = newJobName().getValue(); + final String jobName = newJobName().getValue(); + final String jobSimpleName = jobName; + return new JobRow( + newRowUuid(), + newJobType().name(), + now, + now, + namespaceUuid, + namespaceName, + jobName, + jobSimpleName, + parentJobName, + newDescription(), + null, + newLocation().toString(), + null, + null); + } + + /** Returns new {@link JobVersionRow} objects with a specified {@code limit}. */ + public static Set newJobVersionRowsWith( + @NotNull final Instant now, + @NonNull final UUID namespaceUuid, + @NonNull final String namespaceName, + @NonNull final UUID jobUuid, + @NonNull final String jobName, + @NonNull final Set inputs, + @NonNull final Set outputs, + final int limit) { + return Stream.generate( + () -> + newJobVersionRowWith( + now, namespaceUuid, namespaceName, jobUuid, jobName, inputs, outputs)) + .limit(limit) + .collect(toImmutableSet()); + } + + public static JobVersionRow newJobVersionRowWith( + @NonNull final UUID namespaceUuid, + @NonNull final String namespaceName, + @NonNull final UUID jobUuid, + @NonNull final String jobName, + @NonNull final Set inputs, + @NonNull final Set outputs) { + return newJobVersionRowWith( + NOW, namespaceUuid, namespaceName, jobUuid, jobName, inputs, outputs); + } + + public static JobVersionRow newJobVersionRowWith( + @NotNull final Instant now, + @NonNull final UUID namespaceUuid, + @NonNull final String namespaceName, + @NonNull final UUID jobUuid, + @NonNull final String jobName, + @NonNull final Set inputs, + @NonNull final Set outputs) { + return new JobVersionRow( + newRowUuid(), + now, + now, + jobUuid, + jobName, + inputs.stream().map(DatasetRow::getUuid).collect(toImmutableList()), + outputs.stream().map(DatasetRow::getUuid).collect(toImmutableList()), + newLocation().toString(), + newVersion().getValue(), + newRunId().getValue(), + namespaceUuid, + namespaceName); + } + + public static RunArgsRow newRunArgRow() { + final Map runArgs = newRunArgs(); + final String runArgsAsJson = Utils.toJson(newRunArgs()); + final String checksum = Utils.checksumFor(runArgs); + return new RunArgsRow(newRowUuid(), NOW, runArgsAsJson, checksum); + } + + /** Returns new {@link RunRow} objects with a specified {@code limit}. */ + public static Set newRunRowsWith( + @NotNull final Instant now, + @NonNull final UUID jobUuid, + @NonNull final UUID jobVersionUuid, + @NonNull final UUID runArgUuid, + final int limit) { + return Stream.generate(() -> newRunRowWith(now, jobUuid, jobVersionUuid, runArgUuid)) + .limit(limit) + .collect(toImmutableSet()); + } + + public static RunRow newRunRowWith( + @NonNull final UUID jobUuid, + @NonNull final UUID jobVersionUuid, + @NonNull final UUID runArgUuid) { + return newRunRowWith(NOW, jobUuid, jobVersionUuid, runArgUuid); + } + + public static RunRow newRunRowWith( + @NotNull final Instant now, + @NonNull final UUID jobUuid, + @NonNull final UUID jobVersionUuid, + @NonNull final UUID runArgUuid) { + return new RunRow( + newRowUuid(), + now, + now, + jobUuid, + jobVersionUuid, + null, + runArgUuid, + now, + now, + null, + null, + null, + null, + null, + null, + null); + } + /** Returns a new {@code row} uuid. */ public static UUID newRowUuid() { return UUID.randomUUID(); diff --git a/docs/faq.md b/docs/faq.md new file mode 100644 index 0000000000..24f48b737d --- /dev/null +++ b/docs/faq.md @@ -0,0 +1,41 @@ +# FAQ + +### How do I configure a retention policy for metadata? + +By default, Marquez does not apply a retention policy on collected metadata. However, you can adjust the maximum retention days for metadata in Marquez. This allows you to better manage your storage space and comply with your organizational retention policies. Below, you'll find examples of how to change retention days in `YAML` and via the [CLI](https://github.com/MarquezProject/marquez/tree/main/api/src/main/java/marquez/cli): + +**`YAML`** + +To adjust the retention period, add a **`dbRetention`** section in your [`marquez.yml`](https://github.com/MarquezProject/marquez/blob/main/marquez.example.yml): + +```yaml +# Adjusts retention policy +dbRetention: + # Apply retention policy at a frequency of every '15' minutes + frequencyMins: 15 + # Maximum number of rows deleted per batch + numberOfRowsPerBatch: 1000 + # Maximum retention days + retentionDays: 7 +``` + +**`CLI`** + +To run a _one-off_ _ad-hoc_ retention policy on your metadata, use the [`db-retention`](https://github.com/MarquezProject/marquez/blob/main/api/src/main/java/marquez/cli/DbRetentionCommand.java) command: + +```bash +java -jar marquez-api.jar db-retention \ + --number-of-rows-per-batch 1000 \ + --retention-days 7 \ + marquez.yml +``` + +Use the `--dry-run` flag to output an estimate of metadata deleted by the retention policy: + +```bash +java -jar marquez-api.jar db-retention \ + --number-of-rows-per-batch 1000 \ + --retention-days 7 \ + --dry-run \ + marquez.yml +``` diff --git a/marquez.example.yml b/marquez.example.yml index 50a3184108..9033438907 100644 --- a/marquez.example.yml +++ b/marquez.example.yml @@ -27,6 +27,15 @@ db: user: ${POSTGRES_USER} password: ${POSTGRES_PASSWORD} +# Adjusts retention policy +# dbRetention: + # Apply retention policy at a frequency of every 'X' minutes (default: 15) + # frequencyMins: ${DB_RETENTION_FREQUENCY_MINS:-15} + # Maximum number of rows deleted per batch (default: 1000) + # numberOfRowsPerBatch: ${DB_RETENTION_NUMBER_OF_ROWS_PER_BATCH:-1000} + # Maximum retention days (default: 7) + # retentionDays: ${DB_RETENTION_DAYS:-7} + # Enables flyway configuration overrides (see: https://flywaydb.org/documentation/configfiles) # flyway: # connectRetries: 3