From c55a7ab43b50f6e9c388c31608f996d834b25e0c Mon Sep 17 00:00:00 2001 From: Maciej Obuchowski Date: Fri, 23 Aug 2024 10:54:44 +0200 Subject: [PATCH] feat: notify about potential serialization failures when sending DagRun, don't serialize unnecessary params, guard listener for exceptions Signed-off-by: Maciej Obuchowski --- .../providers/openlineage/plugins/adapter.py | 72 ++++--- .../providers/openlineage/plugins/listener.py | 149 +++++++++---- airflow/providers/openlineage/utils/utils.py | 16 +- .../openlineage/plugins/test_adapter.py | 202 ++++++++++-------- .../openlineage/plugins/test_listener.py | 31 ++- 5 files changed, 299 insertions(+), 171 deletions(-) diff --git a/airflow/providers/openlineage/plugins/adapter.py b/airflow/providers/openlineage/plugins/adapter.py index 847de5953df9b..8cd6e6c605b45 100644 --- a/airflow/providers/openlineage/plugins/adapter.py +++ b/airflow/providers/openlineage/plugins/adapter.py @@ -40,7 +40,6 @@ from airflow.providers.openlineage import __version__ as OPENLINEAGE_PROVIDER_VERSION, conf from airflow.providers.openlineage.utils.utils import ( OpenLineageRedactor, - get_airflow_dag_run_facet, get_airflow_debug_facet, get_airflow_state_run_facet, ) @@ -50,9 +49,9 @@ if TYPE_CHECKING: from datetime import datetime - from airflow.models.dagrun import DagRun from airflow.providers.openlineage.extractors import OperatorLineage from airflow.utils.log.secrets_masker import SecretsMasker + from airflow.utils.state import DagRunState _PRODUCER = f"https://github.com/apache/airflow/tree/providers-openlineage/{OPENLINEAGE_PROVIDER_VERSION}" @@ -336,33 +335,36 @@ def fail_task( def dag_started( self, - dag_run: DagRun, - msg: str, + dag_id: str, + logical_date: datetime, + start_date: datetime, nominal_start_time: str, nominal_end_time: str, + owners: list[str], + run_facets: dict[str, RunFacet], + description: str | None = None, job_facets: dict[str, JobFacet] | None = None, # Custom job facets ): try: - owner = [x.strip() for x in dag_run.dag.owner.split(",")] if dag_run.dag else None event = RunEvent( eventType=RunState.START, - eventTime=dag_run.start_date.isoformat(), + eventTime=start_date.isoformat(), job=self._build_job( - job_name=dag_run.dag_id, + job_name=dag_id, job_type=_JOB_TYPE_DAG, - job_description=dag_run.dag.description if dag_run.dag else None, - owners=owner, + job_description=description, + owners=owners, job_facets=job_facets, ), run=self._build_run( run_id=self.build_dag_run_id( - dag_id=dag_run.dag_id, - logical_date=dag_run.logical_date, + dag_id=dag_id, + logical_date=logical_date, ), - job_name=dag_run.dag_id, + job_name=dag_id, nominal_start_time=nominal_start_time, nominal_end_time=nominal_end_time, - run_facets={**get_airflow_dag_run_facet(dag_run), **get_airflow_debug_facet()}, + run_facets={**run_facets, **get_airflow_debug_facet()}, ), inputs=[], outputs=[], @@ -375,18 +377,29 @@ def dag_started( # This part cannot be wrapped to deduplicate code, otherwise the method cannot be pickled in multiprocessing. self.log.warning("Failed to emit DAG started event: \n %s", traceback.format_exc()) - def dag_success(self, dag_run: DagRun, msg: str): + def dag_success( + self, + dag_id: str, + run_id: str, + end_date: datetime, + logical_date: datetime, + dag_run_state: DagRunState, + task_ids: list[str], + ): try: event = RunEvent( eventType=RunState.COMPLETE, - eventTime=dag_run.end_date.isoformat(), - job=self._build_job(job_name=dag_run.dag_id, job_type=_JOB_TYPE_DAG), + eventTime=end_date.isoformat(), + job=self._build_job(job_name=dag_id, job_type=_JOB_TYPE_DAG), run=Run( runId=self.build_dag_run_id( - dag_id=dag_run.dag_id, - logical_date=dag_run.logical_date, + dag_id=dag_id, + logical_date=logical_date, ), - facets={**get_airflow_state_run_facet(dag_run), **get_airflow_debug_facet()}, + facets={ + **get_airflow_state_run_facet(dag_id, run_id, task_ids, dag_run_state), + **get_airflow_debug_facet(), + }, ), inputs=[], outputs=[], @@ -399,22 +412,31 @@ def dag_success(self, dag_run: DagRun, msg: str): # This part cannot be wrapped to deduplicate code, otherwise the method cannot be pickled in multiprocessing. self.log.warning("Failed to emit DAG success event: \n %s", traceback.format_exc()) - def dag_failed(self, dag_run: DagRun, msg: str): + def dag_failed( + self, + dag_id: str, + run_id: str, + end_date: datetime, + logical_date: datetime, + dag_run_state: DagRunState, + task_ids: list[str], + msg: str, + ): try: event = RunEvent( eventType=RunState.FAIL, - eventTime=dag_run.end_date.isoformat(), - job=self._build_job(job_name=dag_run.dag_id, job_type=_JOB_TYPE_DAG), + eventTime=end_date.isoformat(), + job=self._build_job(job_name=dag_id, job_type=_JOB_TYPE_DAG), run=Run( runId=self.build_dag_run_id( - dag_id=dag_run.dag_id, - logical_date=dag_run.logical_date, + dag_id=dag_id, + logical_date=logical_date, ), facets={ "errorMessage": error_message_run.ErrorMessageRunFacet( message=msg, programmingLanguage="python" ), - **get_airflow_state_run_facet(dag_run), + **get_airflow_state_run_facet(dag_id, run_id, task_ids, dag_run_state), **get_airflow_debug_facet(), }, ), diff --git a/airflow/providers/openlineage/plugins/listener.py b/airflow/providers/openlineage/plugins/listener.py index 7882e188d943d..fbe50e4a728a5 100644 --- a/airflow/providers/openlineage/plugins/listener.py +++ b/airflow/providers/openlineage/plugins/listener.py @@ -27,11 +27,13 @@ from airflow import settings from airflow.listeners import hookimpl +from airflow.models import DagRun from airflow.providers.openlineage import conf from airflow.providers.openlineage.extractors import ExtractorManager from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter, RunState from airflow.providers.openlineage.utils.utils import ( IS_AIRFLOW_2_10_OR_HIGHER, + get_airflow_dag_run_facet, get_airflow_debug_facet, get_airflow_job_facet, get_airflow_mapped_task_facet, @@ -51,7 +53,7 @@ if TYPE_CHECKING: from sqlalchemy.orm import Session - from airflow.models import DagRun, TaskInstance + from airflow.models import TaskInstance _openlineage_listener: OpenLineageListener | None = None @@ -413,65 +415,120 @@ def before_stopping(self, component) -> None: @hookimpl def on_dag_run_running(self, dag_run: DagRun, msg: str) -> None: - if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag): - self.log.debug( - "Skipping OpenLineage event emission for DAG `%s` " - "due to lack of explicit lineage enablement for DAG while " - "[openlineage] selective_enable is on.", - dag_run.dag_id, + try: + if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag): + self.log.debug( + "Skipping OpenLineage event emission for DAG `%s` " + "due to lack of explicit lineage enablement for DAG while " + "[openlineage] selective_enable is on.", + dag_run.dag_id, + ) + return + + if not self.executor: + self.log.debug("Executor have not started before `on_dag_run_running`") + return + + data_interval_start = ( + dag_run.data_interval_start.isoformat() if dag_run.data_interval_start else None ) - return + data_interval_end = dag_run.data_interval_end.isoformat() if dag_run.data_interval_end else None - if not self.executor: - self.log.debug("Executor have not started before `on_dag_run_running`") - return + run_facets = {**get_airflow_dag_run_facet(dag_run)} - data_interval_start = dag_run.data_interval_start.isoformat() if dag_run.data_interval_start else None - data_interval_end = dag_run.data_interval_end.isoformat() if dag_run.data_interval_end else None - self.executor.submit( - self.adapter.dag_started, - dag_run=dag_run, - msg=msg, - nominal_start_time=data_interval_start, - nominal_end_time=data_interval_end, - # AirflowJobFacet should be created outside ProcessPoolExecutor that pickles objects, - # as it causes lack of some TaskGroup attributes and crashes event emission. - job_facets=get_airflow_job_facet(dag_run=dag_run), - ) + self.submit_callable( + self.adapter.dag_started, + dag_id=dag_run.dag_id, + run_id=dag_run.run_id, + logical_date=dag_run.logical_date, + start_date=dag_run.start_date, + nominal_start_time=data_interval_start, + nominal_end_time=data_interval_end, + run_facets=run_facets, + owners=[x.strip() for x in dag_run.dag.owner.split(",")] if dag_run.dag else None, + description=dag_run.dag.description if dag_run.dag else None, + # AirflowJobFacet should be created outside ProcessPoolExecutor that pickles objects, + # as it causes lack of some TaskGroup attributes and crashes event emission. + job_facets=get_airflow_job_facet(dag_run=dag_run), + ) + except BaseException as e: + self.log.warning("OpenLineage received exception in method on_dag_run_running", exc_info=e) @hookimpl def on_dag_run_success(self, dag_run: DagRun, msg: str) -> None: - if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag): - self.log.debug( - "Skipping OpenLineage event emission for DAG `%s` " - "due to lack of explicit lineage enablement for DAG while " - "[openlineage] selective_enable is on.", - dag_run.dag_id, - ) - return + try: + if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag): + self.log.debug( + "Skipping OpenLineage event emission for DAG `%s` " + "due to lack of explicit lineage enablement for DAG while " + "[openlineage] selective_enable is on.", + dag_run.dag_id, + ) + return - if not self.executor: - self.log.debug("Executor have not started before `on_dag_run_success`") - return + if not self.executor: + self.log.debug("Executor have not started before `on_dag_run_success`") + return - self.executor.submit(self.adapter.dag_success, dag_run=dag_run, msg=msg) + if IS_AIRFLOW_2_10_OR_HIGHER: + task_ids = DagRun._get_partial_task_ids(dag_run.dag) + else: + task_ids = dag_run.dag.task_ids if dag_run.dag and dag_run.dag.partial else None + self.submit_callable( + self.adapter.dag_success, + dag_id=dag_run.dag_id, + run_id=dag_run.run_id, + end_date=dag_run.end_date, + logical_date=dag_run.logical_date, + task_ids=task_ids, + dag_run_state=dag_run.get_state(), + ) + except BaseException as e: + self.log.warning("OpenLineage received exception in method on_dag_run_success", exc_info=e) @hookimpl def on_dag_run_failed(self, dag_run: DagRun, msg: str) -> None: - if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag): - self.log.debug( - "Skipping OpenLineage event emission for DAG `%s` " - "due to lack of explicit lineage enablement for DAG while " - "[openlineage] selective_enable is on.", - dag_run.dag_id, + try: + if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag): + self.log.debug( + "Skipping OpenLineage event emission for DAG `%s` " + "due to lack of explicit lineage enablement for DAG while " + "[openlineage] selective_enable is on.", + dag_run.dag_id, + ) + return + + if not self.executor: + self.log.debug("Executor have not started before `on_dag_run_failed`") + return + + if IS_AIRFLOW_2_10_OR_HIGHER: + task_ids = DagRun._get_partial_task_ids(dag_run.dag) + else: + task_ids = dag_run.dag.task_ids if dag_run.dag and dag_run.dag.partial else None + self.submit_callable( + self.adapter.dag_failed, + dag_id=dag_run.dag_id, + run_id=dag_run.run_id, + end_date=dag_run.end_date, + logical_date=dag_run.logical_date, + dag_run_state=dag_run.get_state(), + task_ids=task_ids, + msg=msg, ) - return + except BaseException as e: + self.log.warning("OpenLineage received exception in method on_dag_run_failed", exc_info=e) - if not self.executor: - self.log.debug("Executor have not started before `on_dag_run_failed`") - return + def submit_callable(self, callable, *args, **kwargs): + fut = self.executor.submit(callable, *args, **kwargs) + fut.add_done_callback(self.log_submit_error) + return fut - self.executor.submit(self.adapter.dag_failed, dag_run=dag_run, msg=msg) + def log_submit_error(self, fut): + if fut.exception(): + self.log.warning("Failed to submit method to executor", exc_info=fut.exception()) + else: + self.log.debug("Successfully submitted method to executor") def get_openlineage_listener() -> OpenLineageListener: diff --git a/airflow/providers/openlineage/utils/utils.py b/airflow/providers/openlineage/utils/utils.py index ec58c6e2d743e..f283c09e8759c 100644 --- a/airflow/providers/openlineage/utils/utils.py +++ b/airflow/providers/openlineage/utils/utils.py @@ -33,7 +33,7 @@ from airflow import __version__ as AIRFLOW_VERSION from airflow.datasets import Dataset from airflow.exceptions import AirflowProviderDeprecationWarning # TODO: move this maybe to Airflow's logic? -from airflow.models import DAG, BaseOperator, MappedOperator +from airflow.models import DAG, BaseOperator, DagRun, MappedOperator from airflow.providers.openlineage import conf from airflow.providers.openlineage.plugins.facets import ( AirflowDagRunFacet, @@ -58,9 +58,8 @@ from openlineage.client.event_v2 import Dataset as OpenLineageDataset from openlineage.client.facet_v2 import RunFacet - from airflow.models import DagRun, TaskInstance - from airflow.utils.state import TaskInstanceState - + from airflow.models import TaskInstance + from airflow.utils.state import DagRunState, TaskInstanceState log = logging.getLogger(__name__) _NOMINAL_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" @@ -439,11 +438,14 @@ def get_airflow_job_facet(dag_run: DagRun) -> dict[str, AirflowJobFacet]: } -def get_airflow_state_run_facet(dag_run: DagRun) -> dict[str, AirflowStateRunFacet]: +def get_airflow_state_run_facet( + dag_id: str, run_id: str, task_ids: list[str], dag_run_state: DagRunState +) -> dict[str, AirflowStateRunFacet]: + tis = DagRun.fetch_task_instances(dag_id=dag_id, run_id=run_id, task_ids=task_ids) return { "airflowState": AirflowStateRunFacet( - dagRunState=dag_run.get_state(), - tasksState={ti.task_id: ti.state for ti in dag_run.get_task_instances()}, + dagRunState=dag_run_state, + tasksState={ti.task_id: ti.state for ti in tis}, ) } diff --git a/tests/providers/openlineage/plugins/test_adapter.py b/tests/providers/openlineage/plugins/test_adapter.py index a1e702e263e2d..260883470875f 100644 --- a/tests/providers/openlineage/plugins/test_adapter.py +++ b/tests/providers/openlineage/plugins/test_adapter.py @@ -528,7 +528,7 @@ def test_emit_failed_event_with_additional_information(mock_stats_incr, mock_sta mock_stats_timer.assert_called_with("ol.emit.attempts") -@mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=False) +@mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=True) @mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid") @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer") @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr") @@ -536,7 +536,7 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e" client = MagicMock() adapter = OpenLineageAdapter(client) - event_time = datetime.datetime.now() + event_time = datetime.datetime.fromisoformat("2021-01-01T00:00:00+00:00") dag_id = "dag_id" run_id = str(uuid.uuid4()) @@ -564,14 +564,6 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat job_facets = {**get_airflow_job_facet(dag_run)} - adapter.dag_started( - dag_run=dag_run, - msg="", - nominal_start_time=event_time.isoformat(), - nominal_end_time=event_time.isoformat(), - job_facets=job_facets, - ) - expected_dag_info = { "timetable": {"delta": 86400.0}, "dag_id": dag_id, @@ -586,6 +578,32 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat else: # Airflow 3 and up. expected_dag_info["timetable_summary"] = "1 day, 0:00:00" + dag_run_facet = AirflowDagRunFacet( + dag=expected_dag_info, + dagRun={ + "conf": {}, + "dag_id": "dag_id", + "data_interval_start": event_time.isoformat(), + "data_interval_end": event_time.isoformat(), + "external_trigger": None, + "run_id": run_id, + "run_type": None, + "start_date": event_time.isoformat(), + }, + ) + + adapter.dag_started( + dag_id=dag_id, + start_date=event_time, + logical_date=event_time, + nominal_start_time=event_time.isoformat(), + nominal_end_time=event_time.isoformat(), + owners=["airflow"], + description=dag.description, + run_facets={"airflowDagRun": dag_run_facet}, + job_facets=job_facets, + ) + assert len(client.emit.mock_calls) == 1 client.emit.assert_called_once_with( RunEvent( @@ -611,7 +629,7 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat "start_date": event_time.isoformat(), }, ), - # "debug": AirflowDebugRunFacet(packages=ANY), + "debug": AirflowDebugRunFacet(packages=ANY), }, ), job=Job( @@ -635,18 +653,17 @@ def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, generate_stat outputs=[], ) ) - mock_stats_incr.assert_not_called() mock_stats_timer.assert_called_with("ol.emit.attempts") @mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=True) -@mock.patch.object(DagRun, "get_task_instances") +@mock.patch.object(DagRun, "fetch_task_instances") @mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid") @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer") @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr") def test_emit_dag_complete_event( - mock_stats_incr, mock_stats_timer, generate_static_uuid, mocked_get_tasks, mock_debug_mode + mock_stats_incr, mock_stats_timer, generate_static_uuid, mocked_fetch_tis, mock_debug_mode ): random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e" client = MagicMock() @@ -670,7 +687,7 @@ def test_emit_dag_complete_event( ) dag_run._state = DagRunState.SUCCESS dag_run.end_date = event_time - mocked_get_tasks.return_value = [ + mocked_fetch_tis.return_value = [ TaskInstance(task=task_0, run_id=run_id, state=TaskInstanceState.SUCCESS), TaskInstance(task=task_1, run_id=run_id, state=TaskInstanceState.SKIPPED), TaskInstance(task=task_2, run_id=run_id, state=TaskInstanceState.FAILED), @@ -678,44 +695,45 @@ def test_emit_dag_complete_event( generate_static_uuid.return_value = random_uuid adapter.dag_success( - dag_run=dag_run, - msg="", + dag_id=dag_id, + run_id=run_id, + end_date=event_time, + logical_date=event_time, + dag_run_state=DagRunState.SUCCESS, + task_ids=["task_0", "task_1", "task_2.test"], ) - assert ( - call( - RunEvent( - eventType=RunState.COMPLETE, - eventTime=event_time.isoformat(), - run=Run( - runId=random_uuid, - facets={ - "airflowState": AirflowStateRunFacet( - dagRunState=DagRunState.SUCCESS, - tasksState={ - task_0.task_id: TaskInstanceState.SUCCESS, - task_1.task_id: TaskInstanceState.SKIPPED, - task_2.task_id: TaskInstanceState.FAILED, - }, - ), - "debug": AirflowDebugRunFacet(packages=ANY), - }, - ), - job=Job( - namespace=namespace(), - name=dag_id, - facets={ - "jobType": job_type_job.JobTypeJobFacet( - processingType="BATCH", integration="AIRFLOW", jobType="DAG" - ) - }, - ), - producer=_PRODUCER, - inputs=[], - outputs=[], - ) + client.emit.assert_called_once_with( + RunEvent( + eventType=RunState.COMPLETE, + eventTime=event_time.isoformat(), + run=Run( + runId=random_uuid, + facets={ + "airflowState": AirflowStateRunFacet( + dagRunState=DagRunState.SUCCESS, + tasksState={ + task_0.task_id: TaskInstanceState.SUCCESS, + task_1.task_id: TaskInstanceState.SKIPPED, + task_2.task_id: TaskInstanceState.FAILED, + }, + ), + "debug": AirflowDebugRunFacet(packages=ANY), + }, + ), + job=Job( + namespace=namespace(), + name=dag_id, + facets={ + "jobType": job_type_job.JobTypeJobFacet( + processingType="BATCH", integration="AIRFLOW", jobType="DAG" + ) + }, + ), + producer=_PRODUCER, + inputs=[], + outputs=[], ) - in client.emit.mock_calls ) mock_stats_incr.assert_not_called() @@ -723,12 +741,12 @@ def test_emit_dag_complete_event( @mock.patch("airflow.providers.openlineage.conf.debug_mode", return_value=True) -@mock.patch.object(DagRun, "get_task_instances") +@mock.patch.object(DagRun, "fetch_task_instances") @mock.patch("airflow.providers.openlineage.plugins.adapter.generate_static_uuid") @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.timer") @mock.patch("airflow.providers.openlineage.plugins.adapter.Stats.incr") def test_emit_dag_failed_event( - mock_stats_incr, mock_stats_timer, generate_static_uuid, mocked_get_tasks, mock_debug_mode + mock_stats_incr, mock_stats_timer, generate_static_uuid, mocked_fetch_tis, mock_debug_mode ): random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e" client = MagicMock() @@ -748,9 +766,9 @@ def test_emit_dag_failed_event( start_date=event_time, execution_date=event_time, ) - dag_run._state = DagRunState.SUCCESS + dag_run._state = DagRunState.FAILED dag_run.end_date = event_time - mocked_get_tasks.return_value = [ + mocked_fetch_tis.return_value = [ TaskInstance(task=task_0, run_id=run_id, state=TaskInstanceState.SUCCESS), TaskInstance(task=task_1, run_id=run_id, state=TaskInstanceState.SKIPPED), TaskInstance(task=task_2, run_id=run_id, state=TaskInstanceState.FAILED), @@ -758,47 +776,49 @@ def test_emit_dag_failed_event( generate_static_uuid.return_value = random_uuid adapter.dag_failed( - dag_run=dag_run, + dag_id=dag_id, + run_id=run_id, + end_date=event_time, + logical_date=event_time, + dag_run_state=DagRunState.FAILED, + task_ids=["task_0", "task_1", "task_2.test"], msg="error msg", ) - assert ( - call( - RunEvent( - eventType=RunState.FAIL, - eventTime=event_time.isoformat(), - run=Run( - runId=random_uuid, - facets={ - "errorMessage": error_message_run.ErrorMessageRunFacet( - message="error msg", programmingLanguage="python" - ), - "airflowState": AirflowStateRunFacet( - dagRunState=DagRunState.SUCCESS, - tasksState={ - task_0.task_id: TaskInstanceState.SUCCESS, - task_1.task_id: TaskInstanceState.SKIPPED, - task_2.task_id: TaskInstanceState.FAILED, - }, - ), - "debug": AirflowDebugRunFacet(packages=ANY), - }, - ), - job=Job( - namespace=namespace(), - name=dag_id, - facets={ - "jobType": job_type_job.JobTypeJobFacet( - processingType="BATCH", integration="AIRFLOW", jobType="DAG" - ) - }, - ), - producer=_PRODUCER, - inputs=[], - outputs=[], - ) + client.emit.assert_called_once_with( + RunEvent( + eventType=RunState.FAIL, + eventTime=event_time.isoformat(), + run=Run( + runId=random_uuid, + facets={ + "errorMessage": error_message_run.ErrorMessageRunFacet( + message="error msg", programmingLanguage="python" + ), + "airflowState": AirflowStateRunFacet( + dagRunState=DagRunState.FAILED, + tasksState={ + task_0.task_id: TaskInstanceState.SUCCESS, + task_1.task_id: TaskInstanceState.SKIPPED, + task_2.task_id: TaskInstanceState.FAILED, + }, + ), + "debug": AirflowDebugRunFacet(packages=ANY), + }, + ), + job=Job( + namespace=namespace(), + name=dag_id, + facets={ + "jobType": job_type_job.JobTypeJobFacet( + processingType="BATCH", integration="AIRFLOW", jobType="DAG" + ) + }, + ), + producer=_PRODUCER, + inputs=[], + outputs=[], ) - in client.emit.mock_calls ) mock_stats_incr.assert_not_called() diff --git a/tests/providers/openlineage/plugins/test_listener.py b/tests/providers/openlineage/plugins/test_listener.py index de2732cb3656d..22e3f5d44b6f0 100644 --- a/tests/providers/openlineage/plugins/test_listener.py +++ b/tests/providers/openlineage/plugins/test_listener.py @@ -17,22 +17,27 @@ from __future__ import annotations import datetime as dt +import threading import uuid from contextlib import suppress from typing import Callable from unittest import mock -from unittest.mock import ANY, patch +from unittest.mock import ANY, MagicMock, patch import pandas as pd import pytest +from openlineage.client import OpenLineageClient +from openlineage.client.transport import ConsoleTransport +from openlineage.client.transport.console import ConsoleConfig from airflow.models import DAG, DagRun, TaskInstance from airflow.models.baseoperator import BaseOperator from airflow.operators.python import PythonOperator +from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter from airflow.providers.openlineage.plugins.facets import AirflowDebugRunFacet from airflow.providers.openlineage.plugins.listener import OpenLineageListener from airflow.providers.openlineage.utils.selective_enable import disable_lineage, enable_lineage -from airflow.utils.state import State +from airflow.utils.state import DagRunState, State from tests.test_utils.compat import AIRFLOW_V_2_10_PLUS from tests.test_utils.config import conf_vars @@ -588,6 +593,28 @@ def test_listener_on_dag_run_state_changes_configure_process_pool_size(mock_exec mock_executor.return_value.submit.assert_called_once() +def test_listener_logs_failed_serialization(): + listener = OpenLineageListener() + listener.log = MagicMock() + listener.adapter = OpenLineageAdapter( + client=OpenLineageClient(transport=ConsoleTransport(config=ConsoleConfig())) + ) + event_time = dt.datetime.now() + + fut = listener.submit_callable( + listener.adapter.dag_failed, + dag_id="", + run_id="", + end_date=event_time, + execution_date=threading.Thread(), + dag_run_state=DagRunState.FAILED, + task_ids=["task_id"], + msg="", + ) + assert fut.exception(10) + listener.log.warning.assert_called_once() + + class TestOpenLineageSelectiveEnable: def setup_method(self): self.dag = DAG(