Skip to content

Commit

Permalink
Use UUIDv7 for OpenLineage runIds (apache#39889)
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus authored and romsharon98 committed Jul 26, 2024
1 parent ce0acc3 commit 33c0359
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 92 deletions.
43 changes: 33 additions & 10 deletions airflow/providers/openlineage/plugins/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from __future__ import annotations

import traceback
import uuid
from contextlib import ExitStack
from typing import TYPE_CHECKING

Expand All @@ -36,13 +35,16 @@
SourceCodeLocationJobFacet,
)
from openlineage.client.run import Job, Run, RunEvent, RunState
from openlineage.client.uuid import generate_static_uuid

from airflow.providers.openlineage import __version__ as OPENLINEAGE_PROVIDER_VERSION, conf
from airflow.providers.openlineage.utils.utils import OpenLineageRedactor
from airflow.stats import Stats
from airflow.utils.log.logging_mixin import LoggingMixin

if TYPE_CHECKING:
from datetime import datetime

from airflow.models.dagrun import DagRun
from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.utils.log.secrets_masker import SecretsMasker
Expand Down Expand Up @@ -111,15 +113,25 @@ def _read_yaml_config(path: str) -> dict | None:
return yaml.safe_load(config_file)

@staticmethod
def build_dag_run_id(dag_id, dag_run_id):
return str(uuid.uuid3(uuid.NAMESPACE_URL, f"{conf.namespace()}.{dag_id}.{dag_run_id}"))
def build_dag_run_id(dag_id: str, execution_date: datetime) -> str:
return str(
generate_static_uuid(
instant=execution_date,
data=f"{conf.namespace()}.{dag_id}".encode(),
)
)

@staticmethod
def build_task_instance_run_id(dag_id, task_id, execution_date, try_number):
def build_task_instance_run_id(
dag_id: str,
task_id: str,
try_number: int,
execution_date: datetime,
):
return str(
uuid.uuid3(
uuid.NAMESPACE_URL,
f"{conf.namespace()}.{dag_id}.{task_id}.{execution_date}.{try_number}",
generate_static_uuid(
instant=execution_date,
data=f"{conf.namespace()}.{dag_id}.{task_id}.{try_number}".encode(),
)
)

Expand Down Expand Up @@ -306,7 +318,10 @@ def dag_started(
eventTime=dag_run.start_date.isoformat(),
job=self._build_job(job_name=dag_run.dag_id, job_type=_JOB_TYPE_DAG),
run=self._build_run(
run_id=self.build_dag_run_id(dag_run.dag_id, dag_run.run_id),
run_id=self.build_dag_run_id(
dag_id=dag_run.dag_id,
execution_date=dag_run.execution_date,
),
job_name=dag_run.dag_id,
nominal_start_time=nominal_start_time,
nominal_end_time=nominal_end_time,
Expand All @@ -328,7 +343,12 @@ def dag_success(self, dag_run: DagRun, msg: str):
eventType=RunState.COMPLETE,
eventTime=dag_run.end_date.isoformat(),
job=self._build_job(job_name=dag_run.dag_id, job_type=_JOB_TYPE_DAG),
run=Run(runId=self.build_dag_run_id(dag_run.dag_id, dag_run.run_id)),
run=Run(
runId=self.build_dag_run_id(
dag_id=dag_run.dag_id,
execution_date=dag_run.execution_date,
),
),
inputs=[],
outputs=[],
producer=_PRODUCER,
Expand All @@ -347,7 +367,10 @@ def dag_failed(self, dag_run: DagRun, msg: str):
eventTime=dag_run.end_date.isoformat(),
job=self._build_job(job_name=dag_run.dag_id, job_type=_JOB_TYPE_DAG),
run=Run(
runId=self.build_dag_run_id(dag_run.dag_id, dag_run.run_id),
runId=self.build_dag_run_id(
dag_id=dag_run.dag_id,
execution_date=dag_run.execution_date,
),
facets={"errorMessage": ErrorMessageRunFacet(message=msg, programmingLanguage="python")},
),
inputs=[],
Expand Down
21 changes: 15 additions & 6 deletions airflow/providers/openlineage/plugins/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,16 @@ def on_running():
# we return here because Airflow 2.3 needs task from deferred state
if task_instance.next_method is not None:
return
parent_run_id = self.adapter.build_dag_run_id(dag.dag_id, dagrun.run_id)
parent_run_id = self.adapter.build_dag_run_id(
dag_id=dag.dag_id,
execution_date=dagrun.execution_date,
)

task_uuid = self.adapter.build_task_instance_run_id(
dag_id=dag.dag_id,
task_id=task.task_id,
execution_date=task_instance.execution_date,
try_number=task_instance.try_number,
execution_date=task_instance.execution_date,
)
event_type = RunState.RUNNING.value.lower()
operator_name = task.task_type.lower()
Expand Down Expand Up @@ -184,13 +187,16 @@ def on_task_instance_success(self, previous_state, task_instance: TaskInstance,

@print_warning(self.log)
def on_success():
parent_run_id = OpenLineageAdapter.build_dag_run_id(dag.dag_id, dagrun.run_id)
parent_run_id = OpenLineageAdapter.build_dag_run_id(
dag_id=dag.dag_id,
execution_date=dagrun.execution_date,
)

task_uuid = OpenLineageAdapter.build_task_instance_run_id(
dag_id=dag.dag_id,
task_id=task.task_id,
execution_date=task_instance.execution_date,
try_number=_get_try_number_success(task_instance),
execution_date=task_instance.execution_date,
)
event_type = RunState.COMPLETE.value.lower()
operator_name = task.task_type.lower()
Expand Down Expand Up @@ -246,13 +252,16 @@ def on_task_instance_failed(self, previous_state, task_instance: TaskInstance, s

@print_warning(self.log)
def on_failure():
parent_run_id = OpenLineageAdapter.build_dag_run_id(dag.dag_id, dagrun.run_id)
parent_run_id = OpenLineageAdapter.build_dag_run_id(
dag_id=dag.dag_id,
execution_date=dagrun.execution_date,
)

task_uuid = OpenLineageAdapter.build_task_instance_run_id(
dag_id=dag.dag_id,
task_id=task.task_id,
execution_date=task_instance.execution_date,
try_number=task_instance.try_number,
execution_date=task_instance.execution_date,
)
event_type = RunState.FAIL.value.lower()
operator_name = task.task_type.lower()
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/openlineage/plugins/macros.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ def lineage_run_id(task_instance: TaskInstance):
return OpenLineageAdapter.build_task_instance_run_id(
dag_id=task_instance.dag_id,
task_id=task_instance.task_id,
execution_date=task_instance.execution_date,
try_number=task_instance.try_number,
execution_date=task_instance.execution_date,
)


Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/openlineage/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ dependencies:
- apache-airflow>=2.7.0
- apache-airflow-providers-common-sql>=1.6.0
- attrs>=22.2
- openlineage-integration-common>=0.28.0
- openlineage-python>=0.28.0
- openlineage-integration-common>=1.15.0
- openlineage-python>=1.15.0

integrations:
- integration-name: OpenLineage
Expand Down
4 changes: 2 additions & 2 deletions generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -900,8 +900,8 @@
"apache-airflow-providers-common-sql>=1.6.0",
"apache-airflow>=2.7.0",
"attrs>=22.2",
"openlineage-integration-common>=0.28.0",
"openlineage-python>=0.28.0"
"openlineage-integration-common>=1.15.0",
"openlineage-python>=1.15.0"
],
"devel-deps": [],
"plugins": [
Expand Down
110 changes: 73 additions & 37 deletions tests/providers/openlineage/plugins/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,10 +523,10 @@ def test_emit_failed_event_with_additional_information(mock_stats_incr, mock_sta
mock_stats_timer.assert_called_with("ol.emit.attempts")


@mock.patch("airflow.providers.openlineage.plugins.adapter.uuid")
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, uuid):
def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_static_uuid):
random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
client = MagicMock()
adapter = OpenLineageAdapter(client)
Expand All @@ -538,7 +538,7 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, uuid):
dagrun_mock.start_date = event_time
dagrun_mock.run_id = run_id
dagrun_mock.dag_id = dag_id
uuid.uuid3.return_value = random_uuid
generate_static_uuid.return_value = random_uuid

adapter.dag_started(
dag_run=dagrun_mock,
Expand Down Expand Up @@ -582,10 +582,10 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, uuid):
mock_stats_timer.assert_called_with("ol.emit.attempts")


@mock.patch("airflow.providers.openlineage.plugins.adapter.uuid")
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
def test_emit_dag_complete_event(mock_stats_incr, mock_stats_timer, uuid):
def test_emit_dag_complete_event(mock_stats_incr, mock_stats_timer, generate_static_uuid):
random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
client = MagicMock()
adapter = OpenLineageAdapter(client)
Expand All @@ -598,7 +598,7 @@ def test_emit_dag_complete_event(mock_stats_incr, mock_stats_timer, uuid):
dagrun_mock.end_date = event_time
dagrun_mock.run_id = run_id
dagrun_mock.dag_id = dag_id
uuid.uuid3.return_value = random_uuid
generate_static_uuid.return_value = random_uuid

adapter.dag_success(
dag_run=dagrun_mock,
Expand Down Expand Up @@ -632,10 +632,10 @@ def test_emit_dag_complete_event(mock_stats_incr, mock_stats_timer, uuid):
mock_stats_timer.assert_called_with("ol.emit.attempts")


@mock.patch("airflow.providers.openlineage.plugins.adapter.uuid")
@mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
@mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
def test_emit_dag_failed_event(mock_stats_incr, mock_stats_timer, uuid):
def test_emit_dag_failed_event(mock_stats_incr, mock_stats_timer, generate_static_uuid):
random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
client = MagicMock()
adapter = OpenLineageAdapter(client)
Expand All @@ -648,7 +648,7 @@ def test_emit_dag_failed_event(mock_stats_incr, mock_stats_timer, uuid):
dagrun_mock.end_date = event_time
dagrun_mock.run_id = run_id
dagrun_mock.dag_id = dag_id
uuid.uuid3.return_value = random_uuid
generate_static_uuid.return_value = random_uuid

adapter.dag_failed(
dag_run=dagrun_mock,
Expand Down Expand Up @@ -707,46 +707,82 @@ def test_openlineage_adapter_stats_emit_failed(

def test_build_dag_run_id_is_valid_uuid():
dag_id = "test_dag"
dag_run_id = "run_1"
result = OpenLineageAdapter.build_dag_run_id(dag_id, dag_run_id)
assert uuid.UUID(result)
execution_date = datetime.datetime.now()
result = OpenLineageAdapter.build_dag_run_id(
dag_id=dag_id,
execution_date=execution_date,
)
uuid_result = uuid.UUID(result)
assert uuid_result
assert uuid_result.version == 7


def test_build_dag_run_id_different_inputs_give_different_results():
result1 = OpenLineageAdapter.build_dag_run_id("dag1", "run1")
result2 = OpenLineageAdapter.build_dag_run_id("dag2", "run2")
assert result1 != result2
def test_build_dag_run_id_same_input_give_same_result():
result1 = OpenLineageAdapter.build_dag_run_id(
dag_id="dag1",
execution_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
)
result2 = OpenLineageAdapter.build_dag_run_id(
dag_id="dag1",
execution_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
)
assert result1 == result2


def test_build_dag_run_id_uses_correct_methods_underneath():
dag_id = "test_dag"
dag_run_id = "run_1"
expected = str(uuid.uuid3(uuid.NAMESPACE_URL, f"{namespace()}.{dag_id}.{dag_run_id}"))
actual = OpenLineageAdapter.build_dag_run_id(dag_id, dag_run_id)
assert actual == expected
def test_build_dag_run_id_different_inputs_give_different_results():
result1 = OpenLineageAdapter.build_dag_run_id(
dag_id="dag1",
execution_date=datetime.datetime.now(),
)
result2 = OpenLineageAdapter.build_dag_run_id(
dag_id="dag2",
execution_date=datetime.datetime.now(),
)
assert result1 != result2


def test_build_task_instance_run_id_is_valid_uuid():
result = OpenLineageAdapter.build_task_instance_run_id("dag_1", "task_1", "2023-01-01", 1)
assert uuid.UUID(result)
result = OpenLineageAdapter.build_task_instance_run_id(
dag_id="dag_id",
task_id="task_id",
try_number=1,
execution_date=datetime.datetime.now(),
)
uuid_result = uuid.UUID(result)
assert uuid_result
assert uuid_result.version == 7


def test_build_task_instance_run_id_different_inputs_gives_different_results():
result1 = OpenLineageAdapter.build_task_instance_run_id("dag_1", "task1", "2023-01-01", 1)
result2 = OpenLineageAdapter.build_task_instance_run_id("dag_1", "task2", "2023-01-02", 2)
assert result1 != result2
def test_build_task_instance_run_id_same_input_gives_same_result():
result1 = OpenLineageAdapter.build_task_instance_run_id(
dag_id="dag1",
task_id="task1",
try_number=1,
execution_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
)
result2 = OpenLineageAdapter.build_task_instance_run_id(
dag_id="dag1",
task_id="task1",
try_number=1,
execution_date=datetime.datetime(2024, 1, 1, 1, 1, 1),
)
assert result1 == result2


def test_build_task_instance_run_id_uses_correct_methods_underneath():
dag_id = "dag_1"
task_id = "task_1"
execution_date = "2023-01-01"
try_number = 1
expected = str(
uuid.uuid3(uuid.NAMESPACE_URL, f"{namespace()}.{dag_id}.{task_id}.{execution_date}.{try_number}")
def test_build_task_instance_run_id_different_inputs_gives_different_results():
result1 = OpenLineageAdapter.build_task_instance_run_id(
dag_id="dag1",
task_id="task1",
try_number=1,
execution_date=datetime.datetime.now(),
)
actual = OpenLineageAdapter.build_task_instance_run_id(dag_id, task_id, execution_date, try_number)
assert actual == expected
result2 = OpenLineageAdapter.build_task_instance_run_id(
dag_id="dag2",
task_id="task2",
try_number=2,
execution_date=datetime.datetime.now(),
)
assert result1 != result2


def test_configuration_precedence_when_creating_ol_client():
Expand Down
Loading

0 comments on commit 33c0359

Please sign in to comment.