Skip to content

Commit

Permalink
Fill run state of the parent run when it is created by child run (Mar…
Browse files Browse the repository at this point in the history
…quezProject#2296)

Signed-off-by: Minkyu Park <minkyu.park.200@gmail.com>

Signed-off-by: Minkyu Park <minkyu.park.200@gmail.com>
Co-authored-by: Willy Lulciuc <willy@datakin.com>
  • Loading branch information
fm100 and wslulciuc authored Dec 8, 2022
1 parent c868872 commit baa7528
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 19 deletions.
49 changes: 32 additions & 17 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -498,25 +498,40 @@ private JobRow createParentJobRunRecord(
RunArgsRow argsRow =
createRunArgsDao()
.upsertRunArgs(UUID.randomUUID(), now, "{}", Utils.checksumFor(ImmutableMap.of()));

Optional<RunState> runState = Optional.ofNullable(event.getEventType()).map(this::getRunState);
RunDao runDao = createRunDao();
RunRow newRow =
createRunDao()
.upsert(
uuid,
null,
facet.getRun().getRunId(),
now,
newParentJobRow.getUuid(),
null,
argsRow.getUuid(),
nominalStartTime,
nominalEndTime,
Optional.ofNullable(event.getEventType()).map(this::getRunState).orElse(null),
now,
namespace.getName(),
newParentJobRow.getName(),
newParentJobRow.getLocation(),
newParentJobRow.getJobContextUuid().orElse(null));
runDao.upsert(
uuid,
null,
facet.getRun().getRunId(),
now,
newParentJobRow.getUuid(),
null,
argsRow.getUuid(),
nominalStartTime,
nominalEndTime,
runState.orElse(null),
now,
namespace.getName(),
newParentJobRow.getName(),
newParentJobRow.getLocation(),
newParentJobRow.getJobContextUuid().orElse(null));
log.info("Created new parent run record {}", newRow);

runState
.map(rs -> createRunStateDao().upsert(UUID.randomUUID(), now, uuid, rs))
.ifPresent(
runStateRow -> {
UUID runStateUuid = runStateRow.getUuid();
if (RunState.valueOf(runStateRow.getState()).isDone()) {
runDao.updateEndState(uuid, now, runStateUuid);
} else {
runDao.updateStartState(uuid, now, runStateUuid);
}
});

return newParentJobRow;
}

Expand Down
12 changes: 10 additions & 2 deletions api/src/test/java/marquez/OpenLineageIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static marquez.db.LineageTestUtils.PRODUCER_URL;
import static marquez.db.LineageTestUtils.SCHEMA_URL;
import static org.assertj.core.api.Assertions.as;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

Expand Down Expand Up @@ -52,6 +53,7 @@
import marquez.client.models.Run;
import marquez.common.Utils;
import marquez.db.LineageTestUtils;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.jdbi.v3.core.Jdbi;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -170,7 +172,7 @@ public void testSendOpenLineageEventFailsJsonProcessing() throws IOException {
}

@Test
public void testGetLineageForNonExistantDataset() {
public void testGetLineageForNonExistentDataset() {
CompletableFuture<Integer> response =
this.fetchLineage("dataset:Imadethisup:andthistoo")
.thenApply(HttpResponse::statusCode)
Expand Down Expand Up @@ -419,7 +421,13 @@ public void testOpenLineageJobHierarchyAirflowIntegrationWithParentOnStartEventO
.hasFieldOrPropertyWithValue("id", new JobId(NAMESPACE_NAME, dagName))
.hasFieldOrPropertyWithValue("parentJobName", null);
List<Run> runsList = client.listRuns(NAMESPACE_NAME, dagName);
assertThat(runsList).isNotEmpty().hasSize(1);
assertThat(runsList)
.isNotEmpty()
.hasSize(1)
.first()
.extracting("startedAt", as(InstanceOfAssertFactories.OPTIONAL))
.get()
.isNotNull();
}

@Test
Expand Down

0 comments on commit baa7528

Please sign in to comment.