diff --git a/airflow/providers/openlineage/facets/AirflowJobFacet.json b/airflow/providers/openlineage/facets/AirflowJobFacet.json new file mode 100644 index 0000000000000..51a9954de376a --- /dev/null +++ b/airflow/providers/openlineage/facets/AirflowJobFacet.json @@ -0,0 +1,40 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$defs": { + "AirflowJobFacet": { + "allOf": [ + { + "$ref": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet" + }, + { + "type": "object", + "properties": { + "taskTree": { + "description": "The hierarchical structure of tasks in the DAG.", + "type": "object", + "additionalProperties": true + }, + "taskGroups": { + "description": "Information about all task groups within the DAG.", + "type": "object", + "additionalProperties": true + }, + "tasks": { + "description": "Details of all individual tasks within the DAG.", + "type": "object", + "additionalProperties": true + } + }, + "required": ["taskTree", "taskGroups", "tasks"] + } + ], + "type": "object" + } + }, + "type": "object", + "properties": { + "airflow": { + "$ref": "#/$defs/AirflowJobFacet" + } + } + } diff --git a/airflow/providers/openlineage/facets/AirflowRunFacet.json b/airflow/providers/openlineage/facets/AirflowRunFacet.json new file mode 100644 index 0000000000000..504fb1bc3ab16 --- /dev/null +++ b/airflow/providers/openlineage/facets/AirflowRunFacet.json @@ -0,0 +1,254 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$defs": { + "AirflowRunFacet": { + "allOf": [ + { + "$ref": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet" + }, + { + "type": "object", + "properties": { + "dag": { + "$ref": "#/$defs/DAG" + }, + "dagRun": { + "$ref": "#/$defs/DagRun" + }, + "taskInstance": { + "$ref": "#/$defs/TaskInstance" + }, + "task": { + "$ref": "#/$defs/Task" + }, + "taskUuid": { + "type": "string" + } + }, + "required": [ + "dag", + "dagRun", + "taskInstance", + "task", + "taskUuid" + ] + } + ] + }, + "Task": { + "type": "object", + "properties": { + "depends_on_past": { + "type": "boolean" + }, + "downstream_task_ids": { + "type": "string" + }, + "execution_timeout": { + "type": "string" + }, + "executor_config": { + "type": "object", + "additionalProperties": true + }, + "ignore_first_depends_on_past": { + "type": "boolean" + }, + "is_setup": { + "type": "boolean" + }, + "is_teardown": { + "type": "boolean" + }, + "mapped": { + "type": "boolean" + }, + "max_active_tis_per_dag": { + "type": "integer" + }, + "max_active_tis_per_dagrun": { + "type": "integer" + }, + "max_retry_delay": { + "type": "string" + }, + "multiple_outputs": { + "type": "boolean" + }, + "operator_class": { + "description": "Module + class name of the operator", + "type": "string" + }, + "owner": { + "type": "string" + }, + "priority_weight": { + "type": "integer" + }, + "queue": { + "type": "string" + }, + "retries": { + "type": "integer" + }, + "retry_exponential_backoff": { + "type": "boolean" + }, + "run_as_user": { + "type": "string" + }, + "task_id": { + "type": "string" + }, + "trigger_rule": { + "type": "string" + }, + "upstream_task_ids": { + "type": "string" + }, + "wait_for_downstream": { + "type": "boolean" + }, + "wait_for_past_depends_before_skipping": { + "type": "boolean" + }, + "weight_rule": { + "type": "string" + }, + "task_group": { + "description": "Task group related information", + "type": "object", + "properties": { + "group_id": { + "type": "string" + }, + "downstream_group_ids": { + "type": "string" + }, + "downstream_task_ids": { + "type": "string" + }, + "prefix_group_id": { + "type": "boolean" + }, + "tooltip": { + "type": "string" + }, + "upstream_group_ids": { + "type": "string" + }, + "upstream_task_ids": { + "type": "string" + } + }, + "additionalProperties": true, + "required": ["group_id"] + } + }, + "additionalProperties": true, + "required": [ + "task_id" + ] + }, + "DAG": { + "type": "object", + "properties": { + "dag_id": { + "type": "string" + }, + "description": { + "type": "string" + }, + "owner": { + "type": "string" + }, + "schedule_interval": { + "type": "string" + }, + "start_date": { + "type": "string", + "format": "date-time" + }, + "tags": { + "type": "string" + }, + "timetable": { + "description": "Describes timetable (successor of schedule_interval)", + "type": "object", + "additionalProperties": true + } + }, + "additionalProperties": true, + "required": [ + "dag_id", + "start_date" + ] + }, + "TaskInstance": { + "type": "object", + "properties": { + "duration": { + "type": "number" + }, + "map_index": { + "type": "integer" + }, + "pool": { + "type": "string" + }, + "try_number": { + "type": "integer" + } + }, + "additionalProperties": true, + "required": [ + "pool", + "try_number" + ] + }, + "DagRun": { + "type": "object", + "properties": { + "conf": { + "type": "object", + "additionalProperties": true + }, + "dag_id": { + "type": "string" + }, + "data_interval_start": { + "type": "string", + "format": "date-time" + }, + "data_interval_end": { + "type": "string", + "format": "date-time" + }, + "external_trigger": { + "type": "boolean" + }, + "run_id": { + "type": "string" + }, + "run_type": { + "type": "string" + }, + "start_date": { + "type": "string", + "format": "date-time" + } + }, + "additionalProperties": true, + "required": [ + "dag_id", + "run_id" + ] + } + }, + "type": "object", + "properties": { + "airflow": { + "$ref": "#/$defs/AirflowRunFacet" + } + } +} diff --git a/airflow/providers/openlineage/facets/AirflowStateRunFacet.json b/airflow/providers/openlineage/facets/AirflowStateRunFacet.json new file mode 100644 index 0000000000000..2788e17282ce8 --- /dev/null +++ b/airflow/providers/openlineage/facets/AirflowStateRunFacet.json @@ -0,0 +1,34 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$defs": { + "AirflowStateRunFacet": { + "allOf": [ + { + "$ref": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet" + }, + { + "type": "object", + "properties": { + "dagRunState": { + "description": "The final status of the entire DagRun", + "type": "string" + }, + "tasksState": { + "description": "Mapping of task IDs to their respective states", + "type": "object", + "additionalProperties": true + } + }, + "required": ["dagRunState", "tasksState"] + } + ], + "type": "object" + } + }, + "type": "object", + "properties": { + "airflowState": { + "$ref": "#/$defs/AirflowStateRunFacet" + } + } + } diff --git a/airflow/providers/openlineage/facets/__init__.py b/airflow/providers/openlineage/facets/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow/providers/openlineage/facets/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/openlineage/plugins/adapter.py b/airflow/providers/openlineage/plugins/adapter.py index 608bd568e11c9..339ad55fdd2ce 100644 --- a/airflow/providers/openlineage/plugins/adapter.py +++ b/airflow/providers/openlineage/plugins/adapter.py @@ -38,7 +38,10 @@ from openlineage.client.uuid import generate_static_uuid from airflow.providers.openlineage import __version__ as OPENLINEAGE_PROVIDER_VERSION, conf -from airflow.providers.openlineage.utils.utils import OpenLineageRedactor +from airflow.providers.openlineage.utils.utils import ( + OpenLineageRedactor, + get_airflow_state_run_facet, +) from airflow.stats import Stats from airflow.utils.log.logging_mixin import LoggingMixin @@ -321,12 +324,19 @@ def dag_started( msg: str, nominal_start_time: str, nominal_end_time: str, + job_facets: dict[str, BaseFacet] | None = None, # Custom job facets ): try: event = RunEvent( eventType=RunState.START, eventTime=dag_run.start_date.isoformat(), - job=self._build_job(job_name=dag_run.dag_id, job_type=_JOB_TYPE_DAG), + job=self._build_job( + job_name=dag_run.dag_id, + job_type=_JOB_TYPE_DAG, + job_description=dag_run.dag.description if dag_run.dag else None, + owners=[x.strip() for x in dag_run.dag.owner.split(",")] if dag_run.dag else None, + job_facets=job_facets, + ), run=self._build_run( run_id=self.build_dag_run_id( dag_id=dag_run.dag_id, @@ -358,6 +368,7 @@ def dag_success(self, dag_run: DagRun, msg: str): dag_id=dag_run.dag_id, execution_date=dag_run.execution_date, ), + facets={**get_airflow_state_run_facet(dag_run)}, ), inputs=[], outputs=[], @@ -381,7 +392,10 @@ def dag_failed(self, dag_run: DagRun, msg: str): dag_id=dag_run.dag_id, execution_date=dag_run.execution_date, ), - facets={"errorMessage": ErrorMessageRunFacet(message=msg, programmingLanguage="python")}, + facets={ + "errorMessage": ErrorMessageRunFacet(message=msg, programmingLanguage="python"), + **get_airflow_state_run_facet(dag_run), + }, ), inputs=[], outputs=[], diff --git a/airflow/providers/openlineage/plugins/facets.py b/airflow/providers/openlineage/plugins/facets.py index 925f386d6ecbe..fd1f6ef4b55f9 100644 --- a/airflow/providers/openlineage/plugins/facets.py +++ b/airflow/providers/openlineage/plugins/facets.py @@ -39,15 +39,56 @@ class AirflowMappedTaskRunFacet(BaseFacet): @classmethod def from_task_instance(cls, task_instance): - task = task_instance.task - from airflow.providers.openlineage.utils.utils import get_operator_class + from airflow.providers.openlineage.utils.utils import get_fully_qualified_class_name return cls( mapIndex=task_instance.map_index, - operatorClass=f"{get_operator_class(task).__module__}.{get_operator_class(task).__name__}", + operatorClass=get_fully_qualified_class_name(task_instance.task), ) +@define(slots=False) +class AirflowJobFacet(BaseFacet): + """ + Composite Airflow job facet. + + This facet encapsulates all the necessary information to re-create full scope of an Airflow DAG logic, + enabling reconstruction, visualization, and analysis of DAGs in a comprehensive manner. + It includes detailed representations of the tasks, task groups, and their hierarchical relationships, + making it possible to draw a graph that visually represents the entire DAG structure (like in Airflow UI). + It also indicates whether a task should emit an OpenLineage (OL) event, enabling consumers to anticipate + the number of events and identify the tasks from which they can expect these events. + + Attributes: + taskTree: A dictionary representing the hierarchical structure of tasks in the DAG. + taskGroups: A dictionary that contains information about task groups within the DAG. + tasks: A dictionary detailing individual tasks within the DAG. + """ + + taskTree: dict + taskGroups: dict + tasks: dict + + +@define(slots=False) +class AirflowStateRunFacet(BaseFacet): + """ + Airflow facet providing state information. + + This facet is designed to be sent at a completion event, offering state information about + the DAG run and each individual task. This information is crucial for understanding + the execution flow and comprehensive post-run analysis and debugging, including why certain tasks + did not emit events, which can occur due to the use of control flow operators like the BranchOperator. + + Attributes: + dagRunState: This indicates the final status of the entire DAG run (e.g., "success", "failed"). + tasksState: A dictionary mapping task IDs to their respective states. (e.g., "failed", "skipped"). + """ + + dagRunState: str + tasksState: dict[str, str] + + @define(slots=False) class AirflowRunFacet(BaseFacet): """Composite Airflow run facet.""" diff --git a/airflow/providers/openlineage/plugins/listener.py b/airflow/providers/openlineage/plugins/listener.py index 4a1085168aa8e..76b60d61b7119 100644 --- a/airflow/providers/openlineage/plugins/listener.py +++ b/airflow/providers/openlineage/plugins/listener.py @@ -30,6 +30,7 @@ from airflow.providers.openlineage.extractors import ExtractorManager from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter, RunState from airflow.providers.openlineage.utils.utils import ( + get_airflow_job_facet, get_airflow_run_facet, get_custom_facets, get_job_name, @@ -367,6 +368,9 @@ def on_dag_run_running(self, dag_run: DagRun, msg: str) -> None: msg=msg, nominal_start_time=data_interval_start, nominal_end_time=data_interval_end, + # AirflowJobFacet should be created outside ProcessPoolExecutor that pickles objects, + # as it causes lack of some TaskGroup attributes and crashes event emission. + job_facets={**get_airflow_job_facet(dag_run=dag_run)}, ) @hookimpl diff --git a/airflow/providers/openlineage/utils/utils.py b/airflow/providers/openlineage/utils/utils.py index ff6ad63970a66..6904b32d2db50 100644 --- a/airflow/providers/openlineage/utils/utils.py +++ b/airflow/providers/openlineage/utils/utils.py @@ -20,8 +20,10 @@ import datetime import json import logging -from contextlib import suppress +import re +from contextlib import redirect_stdout, suppress from functools import wraps +from io import StringIO from typing import TYPE_CHECKING, Any, Iterable import attrs @@ -32,8 +34,11 @@ from airflow.models import DAG, BaseOperator, MappedOperator from airflow.providers.openlineage import conf from airflow.providers.openlineage.plugins.facets import ( + AirflowJobFacet, AirflowMappedTaskRunFacet, AirflowRunFacet, + AirflowStateRunFacet, + BaseFacet, UnknownOperatorAttributeRunFacet, UnknownOperatorInstance, ) @@ -41,6 +46,7 @@ is_dag_lineage_enabled, is_task_lineage_enabled, ) +from airflow.serialization.serialized_objects import SerializedBaseOperator from airflow.utils.context import AirflowContextDeprecationWarning from airflow.utils.log.secrets_masker import Redactable, Redacted, SecretsMasker, should_hide_value_for_key from airflow.utils.module_loading import import_string @@ -78,7 +84,11 @@ def get_custom_facets(task_instance: TaskInstance | None = None) -> dict[str, An def get_fully_qualified_class_name(operator: BaseOperator | MappedOperator) -> str: - return operator.__class__.__module__ + "." + operator.__class__.__name__ + if isinstance(operator, (MappedOperator, SerializedBaseOperator)): + # as in airflow.api_connexion.schemas.common_schema.ClassReferenceSchema + return operator._task_module + "." + operator._task_type # type: ignore + op_class = get_operator_class(operator) + return op_class.__module__ + "." + op_class.__name__ def is_operator_disabled(operator: BaseOperator | MappedOperator) -> bool: @@ -168,7 +178,7 @@ def _include_fields(self): class DagInfo(InfoJsonEncodable): """Defines encoding DAG object to JSON.""" - includes = ["dag_id", "schedule_interval", "tags", "start_date"] + includes = ["dag_id", "description", "owner", "schedule_interval", "start_date", "tags"] casts = {"timetable": lambda dag: dag.timetable.serialize() if getattr(dag, "timetable", None) else None} renames = {"_dag_id": "dag_id"} @@ -193,9 +203,9 @@ class TaskInstanceInfo(InfoJsonEncodable): includes = ["duration", "try_number", "pool"] casts = { - "map_index": lambda ti: ti.map_index - if hasattr(ti, "map_index") and getattr(ti, "map_index") != -1 - else None + "map_index": lambda ti: ( + ti.map_index if hasattr(ti, "map_index") and getattr(ti, "map_index") != -1 else None + ) } @@ -234,9 +244,11 @@ class TaskInfo(InfoJsonEncodable): ] casts = { "operator_class": lambda task: task.task_type, - "task_group": lambda task: TaskGroupInfo(task.task_group) - if hasattr(task, "task_group") and getattr(task.task_group, "_group_id", None) - else None, + "task_group": lambda task: ( + TaskGroupInfo(task.task_group) + if hasattr(task, "task_group") and getattr(task.task_group, "_group_id", None) + else None + ), } @@ -262,20 +274,158 @@ def get_airflow_run_facet( task_instance: TaskInstance, task: BaseOperator, task_uuid: str, -): +) -> dict[str, BaseFacet]: return { - "airflow": attrs.asdict( - AirflowRunFacet( - dag=DagInfo(dag), - dagRun=DagRunInfo(dag_run), - taskInstance=TaskInstanceInfo(task_instance), - task=TaskInfo(task), - taskUuid=task_uuid, - ) + "airflow": AirflowRunFacet( + dag=DagInfo(dag), + dagRun=DagRunInfo(dag_run), + taskInstance=TaskInstanceInfo(task_instance), + task=TaskInfo(task), + taskUuid=task_uuid, + ) + } + + +def get_airflow_job_facet(dag_run: DagRun) -> dict[str, BaseFacet]: + if not dag_run.dag: + return {} + return { + "airflow": AirflowJobFacet( + taskTree=_get_parsed_dag_tree(dag_run.dag), + taskGroups=_get_task_groups_details(dag_run.dag), + tasks=_get_tasks_details(dag_run.dag), + ) + } + + +def get_airflow_state_run_facet(dag_run: DagRun) -> dict[str, BaseFacet]: + return { + "airflowState": AirflowStateRunFacet( + dagRunState=dag_run.get_state(), + tasksState={ti.task_id: ti.state for ti in dag_run.get_task_instances()}, ) } +def _safe_get_dag_tree_view(dag: DAG) -> list[str]: + # get_tree_view() has been added in Airflow 2.8.2 + if hasattr(dag, "get_tree_view"): + return dag.get_tree_view().splitlines() + + with redirect_stdout(StringIO()) as stdout: + dag.tree_view() + return stdout.getvalue().splitlines() + + +def _get_parsed_dag_tree(dag: DAG) -> dict: + """ + Get DAG's tasks hierarchy representation. + + While the task dependencies are defined as following: + task >> [task_2, task_4] >> task_7 + task_3 >> task_5 + task_6 # has no dependencies, it's a root and a leaf + + The result of this function will look like: + { + "task": { + "task_2": { + "task_7": {} + }, + "task_4": { + "task_7": {} + } + }, + "task_3": { + "task_5": {} + }, + "task_6": {} + } + """ + lines = _safe_get_dag_tree_view(dag) + task_dict: dict[str, dict] = {} + parent_map: dict[int, tuple[str, dict]] = {} + + for line in lines: + stripped_line = line.strip() + if not stripped_line: + continue + + # Determine the level by counting the leading spaces, assuming 4 spaces per level + # as defined in airflow.models.dag.DAG._generate_tree_view() + level = (len(line) - len(stripped_line)) // 4 + # airflow.models.baseoperator.BaseOperator.__repr__ is used in DAG tree + # + match = re.match(r"^$", stripped_line) + if not match: + return {} + current_task_id = match[2] + + if level == 0: # It's a root task + task_dict[current_task_id] = {} + parent_map[level] = (current_task_id, task_dict[current_task_id]) + else: + # Find the immediate parent task + parent_task, parent_dict = parent_map[(level - 1)] + # Create new dict for the current task + parent_dict[current_task_id] = {} + # Update this task in the parent map + parent_map[level] = (current_task_id, parent_dict[current_task_id]) + + return task_dict + + +def _get_tasks_details(dag: DAG) -> dict: + tasks = { + single_task.task_id: { + "operator": get_fully_qualified_class_name(single_task), + "task_group": single_task.task_group.group_id if single_task.task_group else None, + "emits_ol_events": _emits_ol_events(single_task), + "ui_color": single_task.ui_color, + "ui_fgcolor": single_task.ui_fgcolor, + "ui_label": single_task.label, + "is_setup": single_task.is_setup, + "is_teardown": single_task.is_teardown, + } + for single_task in dag.tasks + } + + return tasks + + +def _get_task_groups_details(dag: DAG) -> dict: + return { + tg_id: { + "parent_group": tg.parent_group.group_id, + "tooltip": tg.tooltip, + "ui_color": tg.ui_color, + "ui_fgcolor": tg.ui_fgcolor, + "ui_label": tg.label, + } + for tg_id, tg in dag.task_group_dict.items() + } + + +def _emits_ol_events(task: BaseOperator | MappedOperator) -> bool: + config_selective_enabled = is_selective_lineage_enabled(task) + config_disabled_for_operators = is_operator_disabled(task) + # empty operators without callbacks/outlets are skipped for optimization by Airflow + # in airflow.models.taskinstance.TaskInstance._schedule_downstream_tasks + is_skipped_as_empty_operator = all( + ( + task.inherits_from_empty_operator, + not task.on_execute_callback, + not task.on_success_callback, + not task.outlets, + ) + ) + + emits_ol_events = all( + (config_selective_enabled, not config_disabled_for_operators, not is_skipped_as_empty_operator) + ) + return emits_ol_events + + def get_unknown_source_attribute_run_facet(task: BaseOperator, name: str | None = None): if not name: name = get_operator_class(task).__name__ diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 7ac38eeaf2f20..8c5164d6759a6 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -362,6 +362,7 @@ dagrun DagRunPydantic dagruns DagRunState +dagRunState DAGs Dask dask @@ -1606,6 +1607,7 @@ taskflow TaskGroup taskgroup TaskGroups +taskGroups TaskInstance taskinstance TaskInstanceKey @@ -1613,6 +1615,8 @@ taskinstancekey taskmeta taskmixin tasksetmeta +tasksState +taskTree tblproperties TCP tcp diff --git a/tests/providers/openlineage/plugins/test_adapter.py b/tests/providers/openlineage/plugins/test_adapter.py index 6f010b1a2d710..0212f1402c340 100644 --- a/tests/providers/openlineage/plugins/test_adapter.py +++ b/tests/providers/openlineage/plugins/test_adapter.py @@ -38,6 +38,11 @@ ) from openlineage.client.run import Dataset, Job, Run, RunEvent, RunState +from airflow import DAG +from airflow.models.dagrun import DagRun, DagRunState +from airflow.models.taskinstance import TaskInstance, TaskInstanceState +from airflow.operators.bash import BashOperator +from airflow.operators.empty import EmptyOperator from airflow.providers.openlineage.conf import ( config_path, custom_extractors, @@ -49,6 +54,11 @@ ) from airflow.providers.openlineage.extractors import OperatorLineage from airflow.providers.openlineage.plugins.adapter import _PRODUCER, OpenLineageAdapter +from airflow.providers.openlineage.plugins.facets import ( + AirflowStateRunFacet, +) +from airflow.providers.openlineage.utils.utils import get_airflow_job_facet +from airflow.utils.task_group import TaskGroup from tests.test_utils.config import conf_vars pytestmark = pytest.mark.db_test @@ -534,19 +544,33 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat dag_id = "dag_id" run_id = str(uuid.uuid4()) - dagrun_mock = MagicMock() - dagrun_mock.start_date = event_time - dagrun_mock.run_id = run_id - dagrun_mock.dag_id = dag_id + with DAG(dag_id=dag_id, description="dag desc", start_date=datetime.datetime(2024, 6, 1)) as dag: + tg = TaskGroup(group_id="tg1") + tg2 = TaskGroup(group_id="tg2", parent_group=tg) + task_0 = BashOperator(task_id="task_0", bash_command="exit 0;") # noqa: F841 + task_1 = BashOperator(task_id="task_1", bash_command="exit 0;", task_group=tg) # noqa: F841 + task_2 = EmptyOperator(task_id="task_2.test.dot", task_group=tg2) # noqa: F841 + + dag_run = DagRun( + dag_id=dag_id, + run_id=run_id, + start_date=event_time, + execution_date=event_time, + ) + dag_run.dag = dag generate_static_uuid.return_value = random_uuid + job_facets = {**get_airflow_job_facet(dag_run)} + adapter.dag_started( - dag_run=dagrun_mock, + dag_run=dag_run, msg="", nominal_start_time=event_time.isoformat(), nominal_end_time=event_time.isoformat(), + job_facets=job_facets, ) + assert len(client.emit.mock_calls) == 1 assert ( call( RunEvent( @@ -565,9 +589,16 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat namespace=namespace(), name="dag_id", facets={ + "documentation": DocumentationJobFacet(description="dag desc"), + "ownership": OwnershipJobFacet( + owners=[ + OwnershipJobFacetOwners(name="airflow", type=None), + ] + ), + **job_facets, "jobType": JobTypeJobFacet( processingType="BATCH", integration="AIRFLOW", jobType="DAG" - ) + ), }, ), producer=_PRODUCER, @@ -582,10 +613,11 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat mock_stats_timer.assert_called_with("ol.emit.attempts") +@mock.patch.object(DagRun, "get_task_instances") @mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid") @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer") @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr") -def test_emit_dag_complete_event(mock_stats_incr, mock_stats_timer, generate_static_uuid): +def test_emit_dag_complete_event(mock_stats_incr, mock_stats_timer, generate_static_uuid, mocked_get_tasks): random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e" client = MagicMock() adapter = OpenLineageAdapter(client) @@ -593,15 +625,30 @@ def test_emit_dag_complete_event(mock_stats_incr, mock_stats_timer, generate_sta dag_id = "dag_id" run_id = str(uuid.uuid4()) - dagrun_mock = MagicMock() - dagrun_mock.start_date = event_time - dagrun_mock.end_date = event_time - dagrun_mock.run_id = run_id - dagrun_mock.dag_id = dag_id + with DAG(dag_id=dag_id, start_date=datetime.datetime(2024, 6, 1)): + task_0 = BashOperator(task_id="task_0", bash_command="exit 0;") + task_1 = BashOperator(task_id="task_1", bash_command="exit 0;") + task_2 = EmptyOperator( + task_id="task_2.test", + ) + + dag_run = DagRun( + dag_id=dag_id, + run_id=run_id, + start_date=event_time, + execution_date=event_time, + ) + dag_run._state = DagRunState.SUCCESS + dag_run.end_date = event_time + mocked_get_tasks.return_value = [ + TaskInstance(task=task_0, run_id=run_id, state=TaskInstanceState.SUCCESS), + TaskInstance(task=task_1, run_id=run_id, state=TaskInstanceState.SKIPPED), + TaskInstance(task=task_2, run_id=run_id, state=TaskInstanceState.FAILED), + ] generate_static_uuid.return_value = random_uuid adapter.dag_success( - dag_run=dagrun_mock, + dag_run=dag_run, msg="", ) @@ -610,10 +657,22 @@ def test_emit_dag_complete_event(mock_stats_incr, mock_stats_timer, generate_sta RunEvent( eventType=RunState.COMPLETE, eventTime=event_time.isoformat(), - run=Run(runId=random_uuid, facets={}), + run=Run( + runId=random_uuid, + facets={ + "airflowState": AirflowStateRunFacet( + dagRunState=DagRunState.SUCCESS, + tasksState={ + task_0.task_id: TaskInstanceState.SUCCESS, + task_1.task_id: TaskInstanceState.SKIPPED, + task_2.task_id: TaskInstanceState.FAILED, + }, + ) + }, + ), job=Job( namespace=namespace(), - name="dag_id", + name=dag_id, facets={ "jobType": JobTypeJobFacet( processingType="BATCH", integration="AIRFLOW", jobType="DAG" @@ -632,10 +691,11 @@ def test_emit_dag_complete_event(mock_stats_incr, mock_stats_timer, generate_sta mock_stats_timer.assert_called_with("ol.emit.attempts") +@mock.patch.object(DagRun, "get_task_instances") @mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid") @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer") @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr") -def test_emit_dag_failed_event(mock_stats_incr, mock_stats_timer, generate_static_uuid): +def test_emit_dag_failed_event(mock_stats_incr, mock_stats_timer, generate_static_uuid, mocked_get_tasks): random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e" client = MagicMock() adapter = OpenLineageAdapter(client) @@ -643,15 +703,28 @@ def test_emit_dag_failed_event(mock_stats_incr, mock_stats_timer, generate_stati dag_id = "dag_id" run_id = str(uuid.uuid4()) - dagrun_mock = MagicMock() - dagrun_mock.start_date = event_time - dagrun_mock.end_date = event_time - dagrun_mock.run_id = run_id - dagrun_mock.dag_id = dag_id + with DAG(dag_id=dag_id, start_date=datetime.datetime(2024, 6, 1)): + task_0 = BashOperator(task_id="task_0", bash_command="exit 0;") + task_1 = BashOperator(task_id="task_1", bash_command="exit 0;") + task_2 = EmptyOperator(task_id="task_2.test") + + dag_run = DagRun( + dag_id=dag_id, + run_id=run_id, + start_date=event_time, + execution_date=event_time, + ) + dag_run._state = DagRunState.SUCCESS + dag_run.end_date = event_time + mocked_get_tasks.return_value = [ + TaskInstance(task=task_0, run_id=run_id, state=TaskInstanceState.SUCCESS), + TaskInstance(task=task_1, run_id=run_id, state=TaskInstanceState.SKIPPED), + TaskInstance(task=task_2, run_id=run_id, state=TaskInstanceState.FAILED), + ] generate_static_uuid.return_value = random_uuid adapter.dag_failed( - dag_run=dagrun_mock, + dag_run=dag_run, msg="error msg", ) @@ -665,12 +738,20 @@ def test_emit_dag_failed_event(mock_stats_incr, mock_stats_timer, generate_stati facets={ "errorMessage": ErrorMessageRunFacet( message="error msg", programmingLanguage="python" - ) + ), + "airflowState": AirflowStateRunFacet( + dagRunState=DagRunState.SUCCESS, + tasksState={ + task_0.task_id: TaskInstanceState.SUCCESS, + task_1.task_id: TaskInstanceState.SKIPPED, + task_2.task_id: TaskInstanceState.FAILED, + }, + ), }, ), job=Job( namespace=namespace(), - name="dag_id", + name=dag_id, facets={ "jobType": JobTypeJobFacet( processingType="BATCH", integration="AIRFLOW", jobType="DAG" diff --git a/tests/providers/openlineage/utils/test_utils.py b/tests/providers/openlineage/utils/test_utils.py index ce1cd3be7eb8b..d58be508d4e04 100644 --- a/tests/providers/openlineage/utils/test_utils.py +++ b/tests/providers/openlineage/utils/test_utils.py @@ -17,34 +17,443 @@ # under the License. from __future__ import annotations -import pytest +import datetime +from unittest.mock import MagicMock -from airflow.decorators import task_group -from airflow.models.taskinstance import TaskInstance as TI +from airflow import DAG +from airflow.models.mappedoperator import MappedOperator +from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator -from airflow.providers.openlineage.plugins.facets import AirflowMappedTaskRunFacet -from airflow.providers.openlineage.utils.utils import get_custom_facets -from airflow.utils import timezone +from airflow.operators.python import PythonOperator +from airflow.providers.openlineage.plugins.facets import AirflowJobFacet +from airflow.providers.openlineage.utils.utils import ( + _get_parsed_dag_tree, + _get_task_groups_details, + _get_tasks_details, + _safe_get_dag_tree_view, + get_airflow_job_facet, + get_fully_qualified_class_name, + get_job_name, + get_operator_class, +) +from airflow.serialization.serialized_objects import SerializedBaseOperator +from airflow.utils.task_group import TaskGroup +from tests.test_utils.mock_operators import MockOperator -DEFAULT_DATE = timezone.datetime(2016, 1, 1) +class CustomOperatorForTest(BashOperator): + pass -@pytest.mark.db_test -def test_get_custom_facets(dag_maker): - with dag_maker(dag_id="dag_test_get_custom_facets") as dag: - @task_group - def task_group_op(k): - EmptyOperator(task_id="empty_operator") +class CustomOperatorFromEmpty(EmptyOperator): + pass - task_group_op.expand(k=[0]) - dag_maker.create_dagrun() - ti_0 = TI(dag.get_task("task_group_op.empty_operator"), execution_date=DEFAULT_DATE, map_index=0) +def test_get_airflow_job_facet(): + with DAG(dag_id="dag", start_date=datetime.datetime(2024, 6, 1)) as dag: + task_0 = BashOperator(task_id="task_0", bash_command="exit 0;") - assert ti_0.map_index == 0 + with TaskGroup("section_1", prefix_group_id=True): + task_10 = PythonOperator(task_id="task_3", python_callable=lambda: 1) - assert get_custom_facets(ti_0)["airflow_mappedTask"] == AirflowMappedTaskRunFacet( - mapIndex=0, - operatorClass=f"{ti_0.task.operator_class.__module__}.{ti_0.task.operator_class.__name__}", + task_0 >> task_10 + + dagrun_mock = MagicMock() + dagrun_mock.dag = dag + + result = get_airflow_job_facet(dagrun_mock) + assert result == { + "airflow": AirflowJobFacet( + taskTree={"task_0": {"section_1.task_3": {}}}, + taskGroups={ + "section_1": { + "parent_group": None, + "tooltip": "", + "ui_color": "CornflowerBlue", + "ui_fgcolor": "#000", + "ui_label": "section_1", + } + }, + tasks={ + "task_0": { + "operator": "airflow.operators.bash.BashOperator", + "task_group": None, + "emits_ol_events": True, + "ui_color": "#f0ede4", + "ui_fgcolor": "#000", + "ui_label": "task_0", + "is_setup": False, + "is_teardown": False, + }, + "section_1.task_3": { + "operator": "airflow.operators.python.PythonOperator", + "task_group": "section_1", + "emits_ol_events": True, + "ui_color": "#ffefeb", + "ui_fgcolor": "#000", + "ui_label": "task_3", + "is_setup": False, + "is_teardown": False, + }, + }, ) + } + + +def test_get_fully_qualified_class_name_serialized_operator(): + op_module_path = "airflow.operators.bash" + op_name = "BashOperator" + + op = BashOperator(task_id="test", bash_command="echo 1") + op_path_before_serialization = get_fully_qualified_class_name(op) + assert op_path_before_serialization == f"{op_module_path}.{op_name}" + + serialized = SerializedBaseOperator.serialize_operator(op) + deserialized = SerializedBaseOperator.deserialize_operator(serialized) + + op_path_after_deserialization = get_fully_qualified_class_name(deserialized) + assert op_path_after_deserialization == f"{op_module_path}.{op_name}" + assert deserialized._task_module == op_module_path + assert deserialized._task_type == op_name + + +def test_get_fully_qualified_class_name_mapped_operator(): + mapped = MockOperator.partial(task_id="task_2").expand(arg2=["a", "b", "c"]) + assert isinstance(mapped, MappedOperator) + mapped_op_path = get_fully_qualified_class_name(mapped) + assert mapped_op_path == "tests.test_utils.mock_operators.MockOperator" + + +def test_get_fully_qualified_class_name_bash_operator(): + result = get_fully_qualified_class_name(BashOperator(task_id="test", bash_command="echo 0;")) + expected_result = "airflow.operators.bash.BashOperator" + assert result == expected_result + + +def test_get_job_name(): + task_instance = MagicMock(dag_id="example_dag", task_id="example_task") + expected_result = "example_dag.example_task" + assert get_job_name(task_instance) == expected_result + + +def test_get_job_name_empty_ids(): + task_instance = MagicMock(dag_id="", task_id="") + expected_result = "." + assert get_job_name(task_instance) == expected_result + + +def test_get_operator_class(): + op_class = get_operator_class(BashOperator(task_id="test", bash_command="echo 0;")) + assert op_class == BashOperator + + +def test_get_operator_class_mapped_operator(): + mapped = MockOperator.partial(task_id="task").expand(arg2=["a", "b", "c"]) + assert isinstance(mapped, MappedOperator) + op_class = get_operator_class(mapped) + assert op_class == MockOperator + + +def test_get_tasks_details(): + with DAG(dag_id="dag", start_date=datetime.datetime(2024, 6, 1)) as dag: + task = CustomOperatorForTest(task_id="task", bash_command="exit 0;") # noqa: F841 + task_0 = BashOperator(task_id="task_0", bash_command="exit 0;") # noqa: F841 + task_1 = CustomOperatorFromEmpty(task_id="task_1") # noqa: F841 + task_2 = PythonOperator(task_id="task_2", python_callable=lambda: 1) # noqa: F841 + task_3 = BashOperator(task_id="task_3", bash_command="exit 0;") # noqa: F841 + task_4 = EmptyOperator(task_id="task_4.test.dot") # noqa: F841 + task_5 = BashOperator(task_id="task_5", bash_command="exit 0;") # noqa: F841 + + with TaskGroup("section_1", prefix_group_id=True) as tg: + task_10 = PythonOperator(task_id="task_3", python_callable=lambda: 1) # noqa: F841 + with TaskGroup("section_2", parent_group=tg) as tg2: + task_11 = EmptyOperator(task_id="task_11") # noqa: F841 + with TaskGroup("section_3", parent_group=tg2): + task_12 = PythonOperator(task_id="task_12", python_callable=lambda: 1) # noqa: F841 + + expected = { + "task": { + "operator": "tests.providers.openlineage.utils.test_utils.CustomOperatorForTest", + "task_group": None, + "emits_ol_events": True, + "ui_color": CustomOperatorForTest.ui_color, + "ui_fgcolor": CustomOperatorForTest.ui_fgcolor, + "ui_label": "task", + "is_setup": False, + "is_teardown": False, + }, + "task_0": { + "operator": "airflow.operators.bash.BashOperator", + "task_group": None, + "emits_ol_events": True, + "ui_color": BashOperator.ui_color, + "ui_fgcolor": BashOperator.ui_fgcolor, + "ui_label": "task_0", + "is_setup": False, + "is_teardown": False, + }, + "task_1": { + "operator": "tests.providers.openlineage.utils.test_utils.CustomOperatorFromEmpty", + "task_group": None, + "emits_ol_events": False, + "ui_color": CustomOperatorFromEmpty.ui_color, + "ui_fgcolor": CustomOperatorFromEmpty.ui_fgcolor, + "ui_label": "task_1", + "is_setup": False, + "is_teardown": False, + }, + "task_2": { + "operator": "airflow.operators.python.PythonOperator", + "task_group": None, + "emits_ol_events": True, + "ui_color": PythonOperator.ui_color, + "ui_fgcolor": PythonOperator.ui_fgcolor, + "ui_label": "task_2", + "is_setup": False, + "is_teardown": False, + }, + "task_3": { + "operator": "airflow.operators.bash.BashOperator", + "task_group": None, + "emits_ol_events": True, + "ui_color": BashOperator.ui_color, + "ui_fgcolor": BashOperator.ui_fgcolor, + "ui_label": "task_3", + "is_setup": False, + "is_teardown": False, + }, + "task_4.test.dot": { + "operator": "airflow.operators.empty.EmptyOperator", + "task_group": None, + "emits_ol_events": False, + "ui_color": EmptyOperator.ui_color, + "ui_fgcolor": EmptyOperator.ui_fgcolor, + "ui_label": "task_4.test.dot", + "is_setup": False, + "is_teardown": False, + }, + "task_5": { + "operator": "airflow.operators.bash.BashOperator", + "task_group": None, + "emits_ol_events": True, + "ui_color": BashOperator.ui_color, + "ui_fgcolor": BashOperator.ui_fgcolor, + "ui_label": "task_5", + "is_setup": False, + "is_teardown": False, + }, + "section_1.task_3": { + "operator": "airflow.operators.python.PythonOperator", + "task_group": "section_1", + "emits_ol_events": True, + "ui_color": PythonOperator.ui_color, + "ui_fgcolor": PythonOperator.ui_fgcolor, + "ui_label": "task_3", + "is_setup": False, + "is_teardown": False, + }, + "section_1.section_2.task_11": { + "operator": "airflow.operators.empty.EmptyOperator", + "task_group": "section_1.section_2", + "emits_ol_events": False, + "ui_color": EmptyOperator.ui_color, + "ui_fgcolor": EmptyOperator.ui_fgcolor, + "ui_label": "task_11", + "is_setup": False, + "is_teardown": False, + }, + "section_1.section_2.section_3.task_12": { + "operator": "airflow.operators.python.PythonOperator", + "task_group": "section_1.section_2.section_3", + "emits_ol_events": True, + "ui_color": PythonOperator.ui_color, + "ui_fgcolor": PythonOperator.ui_fgcolor, + "ui_label": "task_12", + "is_setup": False, + "is_teardown": False, + }, + } + + result = _get_tasks_details(dag) + assert result == expected + + +def test_get_tasks_details_empty_dag(): + assert _get_tasks_details(DAG("test_dag", start_date=datetime.datetime(2024, 6, 1))) == {} + + +def test_dag_tree_level_indent(): + """Tests the correct indentation of tasks in a DAG tree view. + + Test verifies that the tree view of the DAG correctly represents the hierarchical structure + of the tasks with proper indentation. The expected indentation increases by 4 spaces for each + subsequent level in the DAG. The test asserts that the generated tree view matches the expected + lines with correct indentation. + """ + with DAG(dag_id="dag", start_date=datetime.datetime(2024, 6, 1)) as dag: + task_0 = EmptyOperator(task_id="task_0") + task_1 = EmptyOperator(task_id="task_1") + task_2 = EmptyOperator(task_id="task_2") + task_3 = EmptyOperator(task_id="task_3") + + task_0 >> task_1 >> task_2 + task_3 >> task_2 + + indent = 4 * " " + expected_lines = [ + "", + indent + "", + 2 * indent + "", + "", + indent + "", + ] + assert _safe_get_dag_tree_view(dag) == expected_lines + + +def test_get_dag_tree(): + with DAG(dag_id="dag", start_date=datetime.datetime(2024, 6, 1)) as dag: + task = CustomOperatorForTest(task_id="task", bash_command="exit 0;") + task_0 = BashOperator(task_id="task_0", bash_command="exit 0;") + task_1 = BashOperator(task_id="task_1", bash_command="exit 1;") + task_2 = PythonOperator(task_id="task_2", python_callable=lambda: 1) + task_3 = BashOperator(task_id="task_3", bash_command="exit 0;") + task_4 = EmptyOperator( + task_id="task_4", + ) + task_5 = BashOperator(task_id="task_5", bash_command="exit 0;") + task_6 = EmptyOperator(task_id="task_6.test5") + task_7 = BashOperator(task_id="task_7", bash_command="exit 0;") + task_8 = PythonOperator(task_id="task_8", python_callable=lambda: 1) # noqa: F841 + task_9 = BashOperator(task_id="task_9", bash_command="exit 0;") + + with TaskGroup("section_1", prefix_group_id=True) as tg: + task_10 = PythonOperator(task_id="task_3", python_callable=lambda: 1) + with TaskGroup("section_2", parent_group=tg) as tg2: + task_11 = EmptyOperator(task_id="task_11") # noqa: F841 + with TaskGroup("section_3", parent_group=tg2): + task_12 = PythonOperator(task_id="task_12", python_callable=lambda: 1) + + task >> [task_2, task_7] + task_0 >> [task_2, task_1] >> task_3 >> [task_4, task_5] >> task_6 + task_1 >> task_9 >> task_3 >> task_4 >> task_5 >> task_6 + task_3 >> task_10 >> task_12 + + expected = { + "section_1.section_2.task_11": {}, + "task": { + "task_2": { + "task_3": { + "section_1.task_3": {"section_1.section_2.section_3.task_12": {}}, + "task_4": {"task_5": {"task_6.test5": {}}, "task_6.test5": {}}, + "task_5": {"task_6.test5": {}}, + } + }, + "task_7": {}, + }, + "task_0": { + "task_1": { + "task_3": { + "section_1.task_3": {"section_1.section_2.section_3.task_12": {}}, + "task_4": {"task_5": {"task_6.test5": {}}, "task_6.test5": {}}, + "task_5": {"task_6.test5": {}}, + }, + "task_9": { + "task_3": { + "section_1.task_3": {"section_1.section_2.section_3.task_12": {}}, + "task_4": {"task_5": {"task_6.test5": {}}, "task_6.test5": {}}, + "task_5": {"task_6.test5": {}}, + } + }, + }, + "task_2": { + "task_3": { + "section_1.task_3": {"section_1.section_2.section_3.task_12": {}}, + "task_4": {"task_5": {"task_6.test5": {}}, "task_6.test5": {}}, + "task_5": {"task_6.test5": {}}, + } + }, + }, + "task_8": {}, + } + result = _get_parsed_dag_tree(dag) + assert result == expected + + +def test_get_dag_tree_empty_dag(): + assert _get_parsed_dag_tree(DAG("test_dag", start_date=datetime.datetime(2024, 6, 1))) == {} + + +def test_get_task_groups_details(): + with DAG("test_dag", start_date=datetime.datetime(2024, 6, 1)) as dag: + with TaskGroup("tg1", prefix_group_id=True): + task_1 = EmptyOperator(task_id="task_1") # noqa: F841 + with TaskGroup("tg2", prefix_group_id=False): + task = EmptyOperator(task_id="task_1") # noqa: F841 + with TaskGroup("tg3"): + task_2 = EmptyOperator(task_id="task_2") # noqa: F841 + + result = _get_task_groups_details(dag) + expected = { + "tg1": { + "parent_group": None, + "tooltip": "", + "ui_color": "CornflowerBlue", + "ui_fgcolor": "#000", + "ui_label": "tg1", + }, + "tg2": { + "parent_group": None, + "tooltip": "", + "ui_color": "CornflowerBlue", + "ui_fgcolor": "#000", + "ui_label": "tg2", + }, + "tg3": { + "parent_group": None, + "tooltip": "", + "ui_color": "CornflowerBlue", + "ui_fgcolor": "#000", + "ui_label": "tg3", + }, + } + + assert result == expected + + +def test_get_task_groups_details_nested(): + with DAG("test_dag", start_date=datetime.datetime(2024, 6, 1)) as dag: + with TaskGroup("tg1", prefix_group_id=True) as tg: + with TaskGroup("tg2", parent_group=tg) as tg2: + with TaskGroup("tg3", parent_group=tg2): + pass + + result = _get_task_groups_details(dag) + expected = { + "tg1": { + "parent_group": None, + "tooltip": "", + "ui_color": "CornflowerBlue", + "ui_fgcolor": "#000", + "ui_label": "tg1", + }, + "tg1.tg2": { + "parent_group": "tg1", + "tooltip": "", + "ui_color": "CornflowerBlue", + "ui_fgcolor": "#000", + "ui_label": "tg2", + }, + "tg1.tg2.tg3": { + "parent_group": "tg1.tg2", + "tooltip": "", + "ui_color": "CornflowerBlue", + "ui_fgcolor": "#000", + "ui_label": "tg3", + }, + } + + assert result == expected + + +def test_get_task_groups_details_no_task_groups(): + assert _get_task_groups_details(DAG("test_dag", start_date=datetime.datetime(2024, 6, 1))) == {}