Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add lineage_job_namespace and lineage_job_name OpenLineage macros #38829

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 33 additions & 10 deletions airflow/providers/openlineage/plugins/macros.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,41 @@
from airflow.models import TaskInstance


def lineage_job_namespace():
"""
Macro function which returns Airflow OpenLineage namespace.
.. seealso::
For more information take a look at the guide:
:ref:`howto/macros:openlineage`
"""
return conf.namespace()


def lineage_job_name(task_instance: TaskInstance):
"""
Macro function which returns Airflow task name in OpenLineage format (`<dag_id>.<task_id>`).
.. seealso::
For more information take a look at the guide:
:ref:`howto/macros:openlineage`
"""
return get_job_name(task_instance)


def lineage_run_id(task_instance: TaskInstance):
"""
Macro function which returns the generated run id for a given task.
Macro function which returns the generated run id (UUID) for a given task.
This can be used to forward the run id from a task to a child run so the job hierarchy is preserved.
.. seealso::
For more information on how to use this operator, take a look at the guide:
For more information take a look at the guide:
:ref:`howto/macros:openlineage`
"""
if TYPE_CHECKING:
assert task_instance.task

return OpenLineageAdapter.build_task_instance_run_id(
dag_id=task_instance.dag_id,
task_id=task_instance.task.task_id,
task_id=task_instance.task_id,
execution_date=task_instance.execution_date,
try_number=task_instance.try_number,
)
Expand All @@ -56,9 +75,13 @@ def lineage_parent_id(task_instance: TaskInstance):
run so the job hierarchy is preserved. Child run can easily create ParentRunFacet from these information.
.. seealso::
For more information on how to use this macro, take a look at the guide:
For more information take a look at the guide:
:ref:`howto/macros:openlineage`
"""
job_name = get_job_name(task_instance.task)
run_id = lineage_run_id(task_instance)
return f"{conf.namespace()}/{job_name}/{run_id}"
return "/".join(
(
lineage_job_namespace(),
lineage_job_name(task_instance),
lineage_run_id(task_instance),
)
)
9 changes: 7 additions & 2 deletions airflow/providers/openlineage/plugins/openlineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@
from airflow.plugins_manager import AirflowPlugin
from airflow.providers.openlineage import conf
from airflow.providers.openlineage.plugins.listener import get_openlineage_listener
from airflow.providers.openlineage.plugins.macros import lineage_parent_id, lineage_run_id
from airflow.providers.openlineage.plugins.macros import (
lineage_job_name,
lineage_job_namespace,
lineage_parent_id,
lineage_run_id,
)


class OpenLineageProviderPlugin(AirflowPlugin):
Expand All @@ -32,5 +37,5 @@ class OpenLineageProviderPlugin(AirflowPlugin):

name = "OpenLineageProviderPlugin"
if not conf.is_disabled():
macros = [lineage_run_id, lineage_parent_id]
macros = [lineage_job_namespace, lineage_job_name, lineage_run_id, lineage_parent_id]
listeners = [get_openlineage_listener()]
2 changes: 1 addition & 1 deletion airflow/providers/openlineage/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def get_operator_class(task: BaseOperator) -> type:
return task.__class__


def get_job_name(task):
def get_job_name(task: TaskInstance) -> str:
return f"{task.dag_id}.{task.task_id}"


Expand Down
59 changes: 39 additions & 20 deletions docs/apache-airflow-providers-openlineage/macros.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,49 @@ Macros included in OpenLineage plugin get integrated to Airflow's main collectio

They can be invoked as a Jinja template, e.g.

Lineage run id
--------------
Lineage job & run macros
------------------------

These macros:
* ``lineage_job_namespace()``
* ``lineage_job_name(task_instance)``
* ``lineage_run_id(task_instance)``

allow injecting pieces of run information of a given Airflow task into the arguments sent to a remote processing job.
For example, ``SparkSubmitOperator`` can be set up like this:

.. code-block:: python
PythonOperator(
task_id="render_template",
python_callable=my_task_function,
op_args=[
"{{ macros.OpenLineageProviderPlugin.lineage_run_id(task_instance) }}"
], # lineage_run_id macro invoked
provide_context=False,
dag=dag,
)
SparkSubmitOperator(
task_id="my_task",
application="/script.py",
conf={
# separated components
"spark.openlineage.parentJobNamespace": "{{ macros.OpenLineagePlugin.lineage_job_namespace() }}",
"spark.openlineage.parentJobName": "{{ macros.OpenLineagePlugin.lineage_job_name(task_instance) }}",
"spark.openlineage.parentRunId": "{{ macros.OpenLineagePlugin.lineage_run_id(task_instance) }}",
},
)
Lineage parent id
-----------------

Same information, but compacted to one string, can be passed using ``linage_parent_id(task_instance)`` macro:

.. code-block:: python
PythonOperator(
task_id="render_template",
python_callable=my_task_function,
op_args=[
"{{ macros.OpenLineageProviderPlugin.lineage_parent_id(run_id, task_instance) }}"
], # lineage_parent_id macro invoked
provide_context=False,
dag=dag,
)
def my_task_function(templates_dict, **kwargs):
parent_job_namespace, parent_job_name, parent_run_id = templates_dict["parentRun"].split("/")
...
PythonOperator(
task_id="render_template",
python_callable=my_task_function,
templates_dict={
# joined components as one string `<namespace>/<name>/<run_id>`
"parentRun": "{{ macros.OpenLineageProviderPlugin.lineage_parent_id(task_instance) }}",
},
provide_context=False,
dag=dag,
)
36 changes: 29 additions & 7 deletions tests/providers/openlineage/plugins/test_macros.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,38 @@
from unittest import mock

from airflow.providers.openlineage.conf import namespace
from airflow.providers.openlineage.plugins.macros import lineage_parent_id, lineage_run_id
from airflow.providers.openlineage.plugins.macros import (
lineage_job_name,
lineage_job_namespace,
lineage_parent_id,
lineage_run_id,
)

_DAG_NAMESPACE = namespace()


def test_lineage_job_namespace():
assert lineage_job_namespace() == _DAG_NAMESPACE


def test_lineage_job_name():
task_instance = mock.MagicMock(
dag_id="dag_id",
task_id="task_id",
execution_date="execution_date",
try_number=1,
)
assert lineage_job_name(task_instance) == "dag_id.task_id"


def test_lineage_run_id():
task = mock.MagicMock(
dag_id="dag_id", execution_date="execution_date", try_number=1, task=mock.MagicMock(task_id="task_id")
task_instance = mock.MagicMock(
dag_id="dag_id",
task_id="task_id",
execution_date="execution_date",
try_number=1,
)
actual = lineage_run_id(task)
actual = lineage_run_id(task_instance)
expected = str(
uuid.uuid3(
uuid.NAMESPACE_URL,
Expand All @@ -42,12 +64,12 @@ def test_lineage_run_id():
@mock.patch("airflow.providers.openlineage.plugins.macros.lineage_run_id")
def test_lineage_parent_id(mock_run_id):
mock_run_id.return_value = "run_id"
task = mock.MagicMock(
task_instance = mock.MagicMock(
dag_id="dag_id",
task_id="task_id",
execution_date="execution_date",
try_number=1,
task=mock.MagicMock(task_id="task_id", dag_id="dag_id"),
)
actual = lineage_parent_id(task_instance=task)
actual = lineage_parent_id(task_instance)
expected = f"{_DAG_NAMESPACE}/dag_id.task_id/run_id"
assert actual == expected