From d8b3170d90f4040ad399b3d32bdfd3a98c8ed13f Mon Sep 17 00:00:00 2001 From: Martynov Maxim Date: Mon, 8 Apr 2024 13:59:48 +0300 Subject: [PATCH] Add lineage_job_namespace and lineage_job_name OpenLineage macros --- .../providers/openlineage/plugins/macros.py | 43 ++++++++++---- .../openlineage/plugins/openlineage.py | 9 ++- airflow/providers/openlineage/utils/utils.py | 2 +- .../macros.rst | 59 ++++++++++++------- .../openlineage/plugins/test_macros.py | 36 ++++++++--- 5 files changed, 109 insertions(+), 40 deletions(-) diff --git a/airflow/providers/openlineage/plugins/macros.py b/airflow/providers/openlineage/plugins/macros.py index 391b29495fe37..ddfceb345909b 100644 --- a/airflow/providers/openlineage/plugins/macros.py +++ b/airflow/providers/openlineage/plugins/macros.py @@ -26,22 +26,41 @@ from airflow.models import TaskInstance +def lineage_job_namespace(): + """ + Macro function which returns Airflow OpenLineage namespace. + + .. seealso:: + For more information take a look at the guide: + :ref:`howto/macros:openlineage` + """ + return conf.namespace() + + +def lineage_job_name(task_instance: TaskInstance): + """ + Macro function which returns Airflow task name in OpenLineage format (`.`). + + .. seealso:: + For more information take a look at the guide: + :ref:`howto/macros:openlineage` + """ + return get_job_name(task_instance) + + def lineage_run_id(task_instance: TaskInstance): """ - Macro function which returns the generated run id for a given task. + Macro function which returns the generated run id (UUID) for a given task. This can be used to forward the run id from a task to a child run so the job hierarchy is preserved. .. seealso:: - For more information on how to use this operator, take a look at the guide: + For more information take a look at the guide: :ref:`howto/macros:openlineage` """ - if TYPE_CHECKING: - assert task_instance.task - return OpenLineageAdapter.build_task_instance_run_id( dag_id=task_instance.dag_id, - task_id=task_instance.task.task_id, + task_id=task_instance.task_id, execution_date=task_instance.execution_date, try_number=task_instance.try_number, ) @@ -56,9 +75,13 @@ def lineage_parent_id(task_instance: TaskInstance): run so the job hierarchy is preserved. Child run can easily create ParentRunFacet from these information. .. seealso:: - For more information on how to use this macro, take a look at the guide: + For more information take a look at the guide: :ref:`howto/macros:openlineage` """ - job_name = get_job_name(task_instance.task) - run_id = lineage_run_id(task_instance) - return f"{conf.namespace()}/{job_name}/{run_id}" + return "/".join( + ( + lineage_job_namespace(), + lineage_job_name(task_instance), + lineage_run_id(task_instance), + ) + ) diff --git a/airflow/providers/openlineage/plugins/openlineage.py b/airflow/providers/openlineage/plugins/openlineage.py index a0be47a499168..5927929588a5b 100644 --- a/airflow/providers/openlineage/plugins/openlineage.py +++ b/airflow/providers/openlineage/plugins/openlineage.py @@ -19,7 +19,12 @@ from airflow.plugins_manager import AirflowPlugin from airflow.providers.openlineage import conf from airflow.providers.openlineage.plugins.listener import get_openlineage_listener -from airflow.providers.openlineage.plugins.macros import lineage_parent_id, lineage_run_id +from airflow.providers.openlineage.plugins.macros import ( + lineage_job_name, + lineage_job_namespace, + lineage_parent_id, + lineage_run_id, +) class OpenLineageProviderPlugin(AirflowPlugin): @@ -32,5 +37,5 @@ class OpenLineageProviderPlugin(AirflowPlugin): name = "OpenLineageProviderPlugin" if not conf.is_disabled(): - macros = [lineage_run_id, lineage_parent_id] + macros = [lineage_job_namespace, lineage_job_name, lineage_run_id, lineage_parent_id] listeners = [get_openlineage_listener()] diff --git a/airflow/providers/openlineage/utils/utils.py b/airflow/providers/openlineage/utils/utils.py index fb2263b90d7c8..1c777aff761c8 100644 --- a/airflow/providers/openlineage/utils/utils.py +++ b/airflow/providers/openlineage/utils/utils.py @@ -56,7 +56,7 @@ def get_operator_class(task: BaseOperator) -> type: return task.__class__ -def get_job_name(task): +def get_job_name(task: TaskInstance) -> str: return f"{task.dag_id}.{task.task_id}" diff --git a/docs/apache-airflow-providers-openlineage/macros.rst b/docs/apache-airflow-providers-openlineage/macros.rst index 72c1e3a7a554e..3ce285f9664ea 100644 --- a/docs/apache-airflow-providers-openlineage/macros.rst +++ b/docs/apache-airflow-providers-openlineage/macros.rst @@ -24,30 +24,49 @@ Macros included in OpenLineage plugin get integrated to Airflow's main collectio They can be invoked as a Jinja template, e.g. -Lineage run id --------------- +Lineage job & run macros +------------------------ + +These macros: + * ``lineage_job_namespace()`` + * ``lineage_job_name(task_instance)`` + * ``lineage_run_id(task_instance)`` + +allow injecting pieces of run information of a given Airflow task into the arguments sent to a remote processing job. +For example, ``SparkSubmitOperator`` can be set up like this: + .. code-block:: python - PythonOperator( - task_id="render_template", - python_callable=my_task_function, - op_args=[ - "{{ macros.OpenLineageProviderPlugin.lineage_run_id(task_instance) }}" - ], # lineage_run_id macro invoked - provide_context=False, - dag=dag, - ) + SparkSubmitOperator( + task_id="my_task", + application="/script.py", + conf={ + # separated components + "spark.openlineage.parentJobNamespace": "{{ macros.OpenLineagePlugin.lineage_job_namespace() }}", + "spark.openlineage.parentJobName": "{{ macros.OpenLineagePlugin.lineage_job_name(task_instance) }}", + "spark.openlineage.parentRunId": "{{ macros.OpenLineagePlugin.lineage_run_id(task_instance) }}", + }, + ) Lineage parent id ----------------- + +Same information, but compacted to one string, can be passed using ``linage_parent_id(task_instance)`` macro: + .. code-block:: python - PythonOperator( - task_id="render_template", - python_callable=my_task_function, - op_args=[ - "{{ macros.OpenLineageProviderPlugin.lineage_parent_id(run_id, task_instance) }}" - ], # lineage_parent_id macro invoked - provide_context=False, - dag=dag, - ) + def my_task_function(templates_dict, **kwargs): + parent_job_namespace, parent_job_name, parent_run_id = templates_dict["parentRun"].split("/") + ... + + + PythonOperator( + task_id="render_template", + python_callable=my_task_function, + templates_dict={ + # joined components as one string `//` + "parentRun": "{{ macros.OpenLineageProviderPlugin.lineage_parent_id(task_instance) }}", + }, + provide_context=False, + dag=dag, + ) diff --git a/tests/providers/openlineage/plugins/test_macros.py b/tests/providers/openlineage/plugins/test_macros.py index 9e2160aa196f9..a735312ab612b 100644 --- a/tests/providers/openlineage/plugins/test_macros.py +++ b/tests/providers/openlineage/plugins/test_macros.py @@ -20,16 +20,38 @@ from unittest import mock from airflow.providers.openlineage.conf import namespace -from airflow.providers.openlineage.plugins.macros import lineage_parent_id, lineage_run_id +from airflow.providers.openlineage.plugins.macros import ( + lineage_job_name, + lineage_job_namespace, + lineage_parent_id, + lineage_run_id, +) _DAG_NAMESPACE = namespace() +def test_lineage_job_namespace(): + assert lineage_job_namespace() == _DAG_NAMESPACE + + +def test_lineage_job_name(): + task_instance = mock.MagicMock( + dag_id="dag_id", + task_id="task_id", + execution_date="execution_date", + try_number=1, + ) + assert lineage_job_name(task_instance) == "dag_id.task_id" + + def test_lineage_run_id(): - task = mock.MagicMock( - dag_id="dag_id", execution_date="execution_date", try_number=1, task=mock.MagicMock(task_id="task_id") + task_instance = mock.MagicMock( + dag_id="dag_id", + task_id="task_id", + execution_date="execution_date", + try_number=1, ) - actual = lineage_run_id(task) + actual = lineage_run_id(task_instance) expected = str( uuid.uuid3( uuid.NAMESPACE_URL, @@ -42,12 +64,12 @@ def test_lineage_run_id(): @mock.patch("airflow.providers.openlineage.plugins.macros.lineage_run_id") def test_lineage_parent_id(mock_run_id): mock_run_id.return_value = "run_id" - task = mock.MagicMock( + task_instance = mock.MagicMock( dag_id="dag_id", + task_id="task_id", execution_date="execution_date", try_number=1, - task=mock.MagicMock(task_id="task_id", dag_id="dag_id"), ) - actual = lineage_parent_id(task_instance=task) + actual = lineage_parent_id(task_instance) expected = f"{_DAG_NAMESPACE}/dag_id.task_id/run_id" assert actual == expected