From 2eee45d09b546a4a2501a10d4e286e12ef9280db Mon Sep 17 00:00:00 2001
From: Michael Collado <collado.mike@gmail.com>
Date: Fri, 29 Jul 2022 15:16:51 -0700
Subject: [PATCH 1/4] Update jobs update function to dedupe aliases

Signed-off-by: Michael Collado <collado.mike@gmail.com>
---
 .../db/migration/R__1_Jobs_view_and_rewrite_function.sql        | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql b/api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql
index df1504cd10..12a1273657 100644
--- a/api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql
+++ b/api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql
@@ -111,7 +111,7 @@ BEGIN
                  INNER JOIN fqn jf ON jf.uuid = COALESCE(js.link_target_uuid, j.uuid)
         ON CONFLICT (uuid) DO UPDATE
             SET job_fqn=EXCLUDED.job_fqn,
-                aliases = jobs_fqn.aliases || EXCLUDED.aliases;
+                aliases = (SELECT array_agg(DISTINCT a) FROM (SELECT unnest(jobs_fqn.aliases) AS a UNION SELECT unnest(EXCLUDED.aliases) AS a) al);
     END IF;
     SELECT * INTO inserted_job FROM jobs_view WHERE uuid=job_uuid;
     return inserted_job;

From b861598a3890e87a7ed9d7e068fdd862cf174deb Mon Sep 17 00:00:00 2001
From: Michael Collado <collado.mike@gmail.com>
Date: Fri, 29 Jul 2022 15:17:25 -0700
Subject: [PATCH 2/4] Update search query to accommodate symlinked jobs and
 aliases

Signed-off-by: Michael Collado <collado.mike@gmail.com>
---
 api/src/main/java/marquez/db/SearchDao.java   |  8 ++-
 .../test/java/marquez/db/SearchDaoTest.java   | 59 ++++++++++++++++++-
 2 files changed, 63 insertions(+), 4 deletions(-)

diff --git a/api/src/main/java/marquez/db/SearchDao.java b/api/src/main/java/marquez/db/SearchDao.java
index 82b9990c29..1e321b6320 100644
--- a/api/src/main/java/marquez/db/SearchDao.java
+++ b/api/src/main/java/marquez/db/SearchDao.java
@@ -33,9 +33,13 @@ public interface SearchDao {
           + "    FROM datasets AS d\n"
           + "   WHERE  d.name ilike '%' || :query || '%'\n"
           + "   UNION\n"
-          + "  SELECT 'JOB' AS type, j.name, j.updated_at, j.namespace_name\n"
-          + "    FROM jobs_view AS j\n"
+          + "  SELECT DISTINCT ON (j.namespace_name, j.name) \n"
+          + "    'JOB' AS type, j.name, j.updated_at, j.namespace_name\n"
+          + "    FROM (SELECT namespace_name, name, unnest(aliases) AS alias, updated_at \n"
+          + "           FROM jobs_view WHERE symlink_target_uuid IS NULL\n"
+          + "           ORDER BY updated_at DESC) AS j\n"
           + "   WHERE  j.name ilike '%' || :query || '%'\n"
+          + "   OR j.alias ilike '%' || :query || '%'\n"
           + ") AS results\n"
           + "WHERE type = :filter OR CAST(:filter AS TEXT) IS NULL\n"
           + "ORDER BY :sort\n"
diff --git a/api/src/test/java/marquez/db/SearchDaoTest.java b/api/src/test/java/marquez/db/SearchDaoTest.java
index 6057761e16..12ee339ab3 100644
--- a/api/src/test/java/marquez/db/SearchDaoTest.java
+++ b/api/src/test/java/marquez/db/SearchDaoTest.java
@@ -7,6 +7,10 @@
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.net.URL;
+import java.sql.SQLException;
 import java.time.Instant;
 import java.util.List;
 import java.util.Map;
@@ -15,17 +19,24 @@
 import marquez.api.models.SearchFilter;
 import marquez.api.models.SearchResult;
 import marquez.api.models.SearchSort;
+import marquez.common.Utils;
+import marquez.common.models.JobType;
+import marquez.db.models.JobRow;
+import marquez.db.models.NamespaceRow;
 import marquez.jdbi.MarquezJdbiExternalPostgresExtension;
+import marquez.service.models.JobMeta;
 import org.jdbi.v3.core.Jdbi;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.postgresql.util.PGobject;
 
 /** The test suite for {@link SearchDao}. */
 @Tag("DataAccessTests")
 @ExtendWith(MarquezJdbiExternalPostgresExtension.class)
 public class SearchDaoTest {
+
   static final int LIMIT = 25;
   static final int NUM_OF_JOBS = 2;
   /**
@@ -37,7 +48,7 @@ public class SearchDaoTest {
   static SearchDao searchDao;
 
   @BeforeAll
-  public static void setUpOnce(final Jdbi jdbi) {
+  public static void setUpOnce(final Jdbi jdbi) throws SQLException {
     searchDao = jdbi.onDemand(SearchDao.class);
 
     DbTestUtils.newDataset(jdbi, "name_ordering_0");
@@ -48,7 +59,51 @@ public static void setUpOnce(final Jdbi jdbi) {
     DbTestUtils.newDataset(jdbi, "time_ordering_1");
     DbTestUtils.newDataset(jdbi, "time_ordering_2");
 
-    DbTestUtils.newJobs(jdbi, NUM_OF_JOBS);
+    ImmutableSet<JobRow> jobRows = DbTestUtils.newJobs(jdbi, NUM_OF_JOBS);
+
+    // add a symlinked job - validate that the number of results is the same
+    jobRows.stream()
+        .findAny()
+        .ifPresent(
+            j -> {
+              try {
+                NamespaceRow namespaceRow =
+                    jdbi.onDemand(NamespaceDao.class)
+                        .findNamespaceByName(j.getNamespaceName())
+                        .get();
+                JobRow symlinkTargetJob =
+                    DbTestUtils.newJobWith(
+                        jdbi,
+                        namespaceRow.getName(),
+                        "a_new_symlink_target_job",
+                        new JobMeta(
+                            JobType.valueOf(j.getType()),
+                            ImmutableSet.copyOf(j.getInputs()),
+                            ImmutableSet.of(),
+                            new URL(j.getLocation()),
+                            ImmutableMap.of(),
+                            j.getDescription().orElse(null),
+                            null));
+                PGobject inputs = new PGobject();
+                inputs.setType("json");
+                inputs.setValue(Utils.getMapper().writeValueAsString(j.getInputs()));
+                jdbi.onDemand(JobDao.class)
+                    .upsertJob(
+                        j.getUuid(),
+                        JobType.valueOf(j.getType()),
+                        j.getCreatedAt(),
+                        namespaceRow.getUuid(),
+                        namespaceRow.getName(),
+                        j.getName(),
+                        j.getDescription().orElse(null),
+                        j.getJobContextUuid().orElse(null),
+                        j.getLocation(),
+                        symlinkTargetJob.getUuid(),
+                        inputs);
+              } catch (Exception e) {
+                throw new RuntimeException(e);
+              }
+            });
   }
 
   @Test

From 010c97b8f774d614c7d722f8579c0a53494d95c3 Mon Sep 17 00:00:00 2001
From: Michael Collado <collado.mike@gmail.com>
Date: Fri, 29 Jul 2022 15:17:49 -0700
Subject: [PATCH 3/4] Update lineage query to include symlinked jobs in lineage

Signed-off-by: Michael Collado <collado.mike@gmail.com>
---
 api/src/main/java/marquez/db/LineageDao.java  |  13 ++-
 .../test/java/marquez/db/LineageDaoTest.java  | 101 ++++++++++++++++++
 2 files changed, 109 insertions(+), 5 deletions(-)

diff --git a/api/src/main/java/marquez/db/LineageDao.java b/api/src/main/java/marquez/db/LineageDao.java
index a029bcd86b..625c861e5a 100644
--- a/api/src/main/java/marquez/db/LineageDao.java
+++ b/api/src/main/java/marquez/db/LineageDao.java
@@ -43,8 +43,9 @@ public interface LineageDao {
           + "        SELECT j.uuid AS job_uuid,\n"
           + "        ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs,\n"
           + "        ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs\n"
-          + "        FROM jobs j\n"
-          + "        LEFT JOIN job_versions v on j.current_version_uuid = v.uuid\n"
+          + "        FROM jobs_view j\n"
+          + "        LEFT JOIN jobs_view s ON s.symlink_target_uuid=j.uuid\n"
+          + "        LEFT JOIN job_versions v on COALESCE(j.current_version_uuid, s.current_version_uuid) = v.uuid\n"
           + "        LEFT JOIN job_versions_io_mapping io on v.uuid = io.job_version_uuid\n"
           + "        GROUP BY j.uuid\n"
           + "    ),\n"
@@ -60,9 +61,10 @@ public interface LineageDao {
           + "        array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs)\n"
           + "        AND depth < :depth"
           + "    )\n"
-          + "SELECT DISTINCT ON (l2.job_uuid) j.*, inputs AS input_uuids, outputs AS output_uuids, jc.context\n"
+          + "SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids, jc.context\n"
           + "FROM lineage l2\n"
-          + "INNER JOIN jobs_view j ON j.uuid=l2.job_uuid\n"
+          + "INNER JOIN jobs_view s ON s.uuid=l2.job_uuid\n"
+          + "INNER JOIN jobs_view j ON j.uuid=COALESCE(s.symlink_target_uuid, s.uuid)\n"
           + "LEFT JOIN job_contexts jc on jc.uuid = j.current_job_context_uuid")
   Set<JobData> getLineage(@BindList Set<UUID> jobIds, int depth);
 
@@ -88,7 +90,8 @@ public interface LineageDao {
           + "    SELECT DISTINCT on(r.job_name, r.namespace_name) r.*, jv.version\n"
           + "    FROM runs_view r\n"
           + "    INNER JOIN job_versions jv ON jv.uuid=r.job_version_uuid\n"
-          + "    WHERE jv.job_uuid in (<jobUuid>)\n"
+          + "    INNER JOIN jobs_view j ON j.uuid=jv.job_uuid\n"
+          + "    WHERE j.uuid in (<jobUuid>) OR j.symlink_target_uuid IN (<jobUuid>)\n"
           + "    ORDER BY r.job_name, r.namespace_name, created_at DESC\n"
           + ")\n"
           + "SELECT r.*, ra.args, ctx.context, f.facets,\n"
diff --git a/api/src/test/java/marquez/db/LineageDaoTest.java b/api/src/test/java/marquez/db/LineageDaoTest.java
index 21a5ab0608..2db046b7de 100644
--- a/api/src/test/java/marquez/db/LineageDaoTest.java
+++ b/api/src/test/java/marquez/db/LineageDaoTest.java
@@ -14,6 +14,7 @@
 import static org.assertj.core.api.Assertions.assertThat;
 
 import com.google.common.base.Functions;
+import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
@@ -25,10 +26,13 @@
 import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import marquez.common.models.JobType;
 import marquez.db.LineageTestUtils.DatasetConsumerJob;
 import marquez.db.LineageTestUtils.JobLineage;
 import marquez.db.models.DatasetData;
 import marquez.db.models.JobData;
+import marquez.db.models.JobRow;
+import marquez.db.models.NamespaceRow;
 import marquez.db.models.UpdateLineageRow;
 import marquez.jdbi.MarquezJdbiExternalPostgresExtension;
 import marquez.service.models.LineageEvent;
@@ -44,6 +48,7 @@
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.postgresql.util.PGobject;
 
 @ExtendWith(MarquezJdbiExternalPostgresExtension.class)
 public class LineageDaoTest {
@@ -177,6 +182,102 @@ public void testGetLineage() {
     }
   }
 
+  @Test
+  public void testGetLineageForSymlinkedJob() throws SQLException {
+
+    UpdateLineageRow writeJob =
+        LineageTestUtils.createLineageRow(
+            openLineageDao,
+            "writeJob",
+            "COMPLETE",
+            jobFacet,
+            Arrays.asList(),
+            Arrays.asList(dataset));
+    List<JobLineage> jobRows =
+        writeDownstreamLineage(
+            openLineageDao,
+            new LinkedList<>(
+                Arrays.asList(
+                    new DatasetConsumerJob("readJob", 20, Optional.of("outputData")),
+                    new DatasetConsumerJob("downstreamJob", 1, Optional.empty()))),
+            jobFacet,
+            dataset);
+
+    NamespaceRow namespaceRow =
+        jdbi.onDemand(NamespaceDao.class)
+            .findNamespaceByName(writeJob.getJob().getNamespaceName())
+            .get();
+
+    PGobject inputs = new PGobject();
+    inputs.setType("json");
+    inputs.setValue("[]");
+
+    String symlinkTargetJobName = "A_new_write_job";
+    JobRow targetJob =
+        jdbi.onDemand(JobDao.class)
+            .upsertJob(
+                UUID.randomUUID(),
+                JobType.valueOf(writeJob.getJob().getType()),
+                writeJob.getJob().getCreatedAt(),
+                namespaceRow.getUuid(),
+                writeJob.getJob().getNamespaceName(),
+                symlinkTargetJobName,
+                writeJob.getJob().getDescription().orElse(null),
+                writeJob.getJob().getJobContextUuid().orElse(null),
+                writeJob.getJob().getLocation(),
+                null,
+                inputs);
+    jdbi.onDemand(JobDao.class)
+        .upsertJob(
+            writeJob.getJob().getUuid(),
+            JobType.valueOf(writeJob.getJob().getType()),
+            writeJob.getJob().getCreatedAt(),
+            namespaceRow.getUuid(),
+            writeJob.getJob().getNamespaceName(),
+            writeJob.getJob().getName(),
+            writeJob.getJob().getDescription().orElse(null),
+            writeJob.getJob().getJobContextUuid().orElse(null),
+            writeJob.getJob().getLocation(),
+            targetJob.getUuid(),
+            inputs);
+
+    // fetch the first "targetJob" lineage.
+    Set<JobData> connectedJobs =
+        lineageDao.getLineage(new HashSet<>(Arrays.asList(targetJob.getUuid())), 2);
+
+    // 20 readJobs + 1 downstreamJob for each (20) + 1 write job = 41
+    assertThat(connectedJobs).size().isEqualTo(41);
+
+    Set<UUID> jobIds = connectedJobs.stream().map(JobData::getUuid).collect(Collectors.toSet());
+    // expect the job that wrote "commonDataset", which is readJob0's input
+    assertThat(jobIds).contains(targetJob.getUuid());
+
+    // expect all downstream jobs
+    Set<UUID> readJobUUIDs =
+        jobRows.stream()
+            .flatMap(row -> Stream.concat(Stream.of(row), row.getDownstreamJobs().stream()))
+            .map(JobLineage::getId)
+            .collect(Collectors.toSet());
+    assertThat(jobIds).containsAll(readJobUUIDs);
+
+    Map<UUID, JobData> actualJobRows =
+        connectedJobs.stream().collect(Collectors.toMap(JobData::getUuid, Functions.identity()));
+    for (JobLineage expected : jobRows) {
+      JobData job = actualJobRows.get(expected.getId());
+      assertThat(job.getInputUuids())
+          .containsAll(
+              expected.getInput().map(ds -> ds.getDatasetRow().getUuid()).stream()::iterator);
+      assertThat(job.getOutputUuids())
+          .containsAll(
+              expected.getOutput().map(ds -> ds.getDatasetRow().getUuid()).stream()::iterator);
+    }
+    Set<UUID> lineageForOriginalJob =
+        lineageDao.getLineage(new HashSet<>(Arrays.asList(writeJob.getJob().getUuid())), 2).stream()
+            .map(JobData::getUuid)
+            .collect(Collectors.toSet());
+    assertThat(lineageForOriginalJob).isEqualTo(jobIds);
+  }
+
   @Test
   public void testGetLineageWithJobThatHasNoDownstreamConsumers() {
 

From 48130fe0e6fee7d64c200e2243c18de809c69b20 Mon Sep 17 00:00:00 2001
From: Michael Collado <collado.mike@gmail.com>
Date: Mon, 1 Aug 2022 12:25:12 -0700
Subject: [PATCH 4/4] Updated search test to validate symlink target jobs are
 returned

Signed-off-by: Michael Collado <collado.mike@gmail.com>
---
 api/src/test/java/marquez/db/SearchDaoTest.java | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)

diff --git a/api/src/test/java/marquez/db/SearchDaoTest.java b/api/src/test/java/marquez/db/SearchDaoTest.java
index 12ee339ab3..3130e5ca85 100644
--- a/api/src/test/java/marquez/db/SearchDaoTest.java
+++ b/api/src/test/java/marquez/db/SearchDaoTest.java
@@ -45,6 +45,8 @@ public class SearchDaoTest {
    */
   static final int NUM_OF_DATASETS = 12;
 
+  public static final String NEW_SYMLINK_TARGET_JOB = "a_new_symlink_target_job";
+
   static SearchDao searchDao;
 
   @BeforeAll
@@ -61,7 +63,7 @@ public static void setUpOnce(final Jdbi jdbi) throws SQLException {
 
     ImmutableSet<JobRow> jobRows = DbTestUtils.newJobs(jdbi, NUM_OF_JOBS);
 
-    // add a symlinked job - validate that the number of results is the same
+    // add a symlinked job - validate that the number of results is the same in the below unit test
     jobRows.stream()
         .findAny()
         .ifPresent(
@@ -75,7 +77,7 @@ public static void setUpOnce(final Jdbi jdbi) throws SQLException {
                     DbTestUtils.newJobWith(
                         jdbi,
                         namespaceRow.getName(),
-                        "a_new_symlink_target_job",
+                        NEW_SYMLINK_TARGET_JOB,
                         new JobMeta(
                             JobType.valueOf(j.getType()),
                             ImmutableSet.copyOf(j.getInputs()),
@@ -127,6 +129,12 @@ public void testSearch() {
     final List<SearchResult> resultsWithOnlyJobs =
         resultsGroupedByType.get(SearchResult.ResultType.JOB);
     assertThat(resultsWithOnlyJobs).hasSize(NUM_OF_JOBS);
+
+    // Even though we searched for "test" and the symlink target doesn't have "test" in its name,
+    // it is part of the search results because the original job had "test" in its name.
+    assertThat(resultsWithOnlyJobs)
+        .filteredOn(j -> j.getName().equals(NEW_SYMLINK_TARGET_JOB))
+        .isNotEmpty();
   }
 
   @Test