Skip to content

Commit

Permalink
feat: notify about potential serialization failures when sending DagR…
Browse files Browse the repository at this point in the history
…un, don't serialize unnecessary params, guard listener for exceptions

Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
  • Loading branch information
mobuchowski committed Sep 2, 2024
1 parent 8640f3e commit c55a7ab
Show file tree
Hide file tree
Showing 5 changed files with 299 additions and 171 deletions.
72 changes: 47 additions & 25 deletions airflow/providers/openlineage/plugins/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
from airflow.providers.openlineage import __version__ as OPENLINEAGE_PROVIDER_VERSION, conf
from airflow.providers.openlineage.utils.utils import (
OpenLineageRedactor,
get_airflow_dag_run_facet,
get_airflow_debug_facet,
get_airflow_state_run_facet,
)
Expand All @@ -50,9 +49,9 @@
if TYPE_CHECKING:
from datetime import datetime

from airflow.models.dagrun import DagRun
from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.utils.log.secrets_masker import SecretsMasker
from airflow.utils.state import DagRunState

_PRODUCER = f"https://github.com/apache/airflow/tree/providers-openlineage/{OPENLINEAGE_PROVIDER_VERSION}"

Expand Down Expand Up @@ -336,33 +335,36 @@ def fail_task(

def dag_started(
self,
dag_run: DagRun,
msg: str,
dag_id: str,
logical_date: datetime,
start_date: datetime,
nominal_start_time: str,
nominal_end_time: str,
owners: list[str],
run_facets: dict[str, RunFacet],
description: str | None = None,
job_facets: dict[str, JobFacet] | None = None, # Custom job facets
):
try:
owner = [x.strip() for x in dag_run.dag.owner.split(",")] if dag_run.dag else None
event = RunEvent(
eventType=RunState.START,
eventTime=dag_run.start_date.isoformat(),
eventTime=start_date.isoformat(),
job=self._build_job(
job_name=dag_run.dag_id,
job_name=dag_id,
job_type=_JOB_TYPE_DAG,
job_description=dag_run.dag.description if dag_run.dag else None,
owners=owner,
job_description=description,
owners=owners,
job_facets=job_facets,
),
run=self._build_run(
run_id=self.build_dag_run_id(
dag_id=dag_run.dag_id,
logical_date=dag_run.logical_date,
dag_id=dag_id,
logical_date=logical_date,
),
job_name=dag_run.dag_id,
job_name=dag_id,
nominal_start_time=nominal_start_time,
nominal_end_time=nominal_end_time,
run_facets={**get_airflow_dag_run_facet(dag_run), **get_airflow_debug_facet()},
run_facets={**run_facets, **get_airflow_debug_facet()},
),
inputs=[],
outputs=[],
Expand All @@ -375,18 +377,29 @@ def dag_started(
# This part cannot be wrapped to deduplicate code, otherwise the method cannot be pickled in multiprocessing.
self.log.warning("Failed to emit DAG started event: \n %s", traceback.format_exc())

def dag_success(self, dag_run: DagRun, msg: str):
def dag_success(
self,
dag_id: str,
run_id: str,
end_date: datetime,
logical_date: datetime,
dag_run_state: DagRunState,
task_ids: list[str],
):
try:
event = RunEvent(
eventType=RunState.COMPLETE,
eventTime=dag_run.end_date.isoformat(),
job=self._build_job(job_name=dag_run.dag_id, job_type=_JOB_TYPE_DAG),
eventTime=end_date.isoformat(),
job=self._build_job(job_name=dag_id, job_type=_JOB_TYPE_DAG),
run=Run(
runId=self.build_dag_run_id(
dag_id=dag_run.dag_id,
logical_date=dag_run.logical_date,
dag_id=dag_id,
logical_date=logical_date,
),
facets={**get_airflow_state_run_facet(dag_run), **get_airflow_debug_facet()},
facets={
**get_airflow_state_run_facet(dag_id, run_id, task_ids, dag_run_state),
**get_airflow_debug_facet(),
},
),
inputs=[],
outputs=[],
Expand All @@ -399,22 +412,31 @@ def dag_success(self, dag_run: DagRun, msg: str):
# This part cannot be wrapped to deduplicate code, otherwise the method cannot be pickled in multiprocessing.
self.log.warning("Failed to emit DAG success event: \n %s", traceback.format_exc())

def dag_failed(self, dag_run: DagRun, msg: str):
def dag_failed(
self,
dag_id: str,
run_id: str,
end_date: datetime,
logical_date: datetime,
dag_run_state: DagRunState,
task_ids: list[str],
msg: str,
):
try:
event = RunEvent(
eventType=RunState.FAIL,
eventTime=dag_run.end_date.isoformat(),
job=self._build_job(job_name=dag_run.dag_id, job_type=_JOB_TYPE_DAG),
eventTime=end_date.isoformat(),
job=self._build_job(job_name=dag_id, job_type=_JOB_TYPE_DAG),
run=Run(
runId=self.build_dag_run_id(
dag_id=dag_run.dag_id,
logical_date=dag_run.logical_date,
dag_id=dag_id,
logical_date=logical_date,
),
facets={
"errorMessage": error_message_run.ErrorMessageRunFacet(
message=msg, programmingLanguage="python"
),
**get_airflow_state_run_facet(dag_run),
**get_airflow_state_run_facet(dag_id, run_id, task_ids, dag_run_state),
**get_airflow_debug_facet(),
},
),
Expand Down
149 changes: 103 additions & 46 deletions airflow/providers/openlineage/plugins/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@

from airflow import settings
from airflow.listeners import hookimpl
from airflow.models import DagRun
from airflow.providers.openlineage import conf
from airflow.providers.openlineage.extractors import ExtractorManager
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter, RunState
from airflow.providers.openlineage.utils.utils import (
IS_AIRFLOW_2_10_OR_HIGHER,
get_airflow_dag_run_facet,
get_airflow_debug_facet,
get_airflow_job_facet,
get_airflow_mapped_task_facet,
Expand All @@ -51,7 +53,7 @@
if TYPE_CHECKING:
from sqlalchemy.orm import Session

from airflow.models import DagRun, TaskInstance
from airflow.models import TaskInstance

_openlineage_listener: OpenLineageListener | None = None

Expand Down Expand Up @@ -413,65 +415,120 @@ def before_stopping(self, component) -> None:

@hookimpl
def on_dag_run_running(self, dag_run: DagRun, msg: str) -> None:
if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
self.log.debug(
"Skipping OpenLineage event emission for DAG `%s` "
"due to lack of explicit lineage enablement for DAG while "
"[openlineage] selective_enable is on.",
dag_run.dag_id,
try:
if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
self.log.debug(
"Skipping OpenLineage event emission for DAG `%s` "
"due to lack of explicit lineage enablement for DAG while "
"[openlineage] selective_enable is on.",
dag_run.dag_id,
)
return

if not self.executor:
self.log.debug("Executor have not started before `on_dag_run_running`")
return

data_interval_start = (
dag_run.data_interval_start.isoformat() if dag_run.data_interval_start else None
)
return
data_interval_end = dag_run.data_interval_end.isoformat() if dag_run.data_interval_end else None

if not self.executor:
self.log.debug("Executor have not started before `on_dag_run_running`")
return
run_facets = {**get_airflow_dag_run_facet(dag_run)}

data_interval_start = dag_run.data_interval_start.isoformat() if dag_run.data_interval_start else None
data_interval_end = dag_run.data_interval_end.isoformat() if dag_run.data_interval_end else None
self.executor.submit(
self.adapter.dag_started,
dag_run=dag_run,
msg=msg,
nominal_start_time=data_interval_start,
nominal_end_time=data_interval_end,
# AirflowJobFacet should be created outside ProcessPoolExecutor that pickles objects,
# as it causes lack of some TaskGroup attributes and crashes event emission.
job_facets=get_airflow_job_facet(dag_run=dag_run),
)
self.submit_callable(
self.adapter.dag_started,
dag_id=dag_run.dag_id,
run_id=dag_run.run_id,
logical_date=dag_run.logical_date,
start_date=dag_run.start_date,
nominal_start_time=data_interval_start,
nominal_end_time=data_interval_end,
run_facets=run_facets,
owners=[x.strip() for x in dag_run.dag.owner.split(",")] if dag_run.dag else None,
description=dag_run.dag.description if dag_run.dag else None,
# AirflowJobFacet should be created outside ProcessPoolExecutor that pickles objects,
# as it causes lack of some TaskGroup attributes and crashes event emission.
job_facets=get_airflow_job_facet(dag_run=dag_run),
)
except BaseException as e:
self.log.warning("OpenLineage received exception in method on_dag_run_running", exc_info=e)

@hookimpl
def on_dag_run_success(self, dag_run: DagRun, msg: str) -> None:
if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
self.log.debug(
"Skipping OpenLineage event emission for DAG `%s` "
"due to lack of explicit lineage enablement for DAG while "
"[openlineage] selective_enable is on.",
dag_run.dag_id,
)
return
try:
if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
self.log.debug(
"Skipping OpenLineage event emission for DAG `%s` "
"due to lack of explicit lineage enablement for DAG while "
"[openlineage] selective_enable is on.",
dag_run.dag_id,
)
return

if not self.executor:
self.log.debug("Executor have not started before `on_dag_run_success`")
return
if not self.executor:
self.log.debug("Executor have not started before `on_dag_run_success`")
return

self.executor.submit(self.adapter.dag_success, dag_run=dag_run, msg=msg)
if IS_AIRFLOW_2_10_OR_HIGHER:
task_ids = DagRun._get_partial_task_ids(dag_run.dag)
else:
task_ids = dag_run.dag.task_ids if dag_run.dag and dag_run.dag.partial else None
self.submit_callable(
self.adapter.dag_success,
dag_id=dag_run.dag_id,
run_id=dag_run.run_id,
end_date=dag_run.end_date,
logical_date=dag_run.logical_date,
task_ids=task_ids,
dag_run_state=dag_run.get_state(),
)
except BaseException as e:
self.log.warning("OpenLineage received exception in method on_dag_run_success", exc_info=e)

@hookimpl
def on_dag_run_failed(self, dag_run: DagRun, msg: str) -> None:
if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
self.log.debug(
"Skipping OpenLineage event emission for DAG `%s` "
"due to lack of explicit lineage enablement for DAG while "
"[openlineage] selective_enable is on.",
dag_run.dag_id,
try:
if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
self.log.debug(
"Skipping OpenLineage event emission for DAG `%s` "
"due to lack of explicit lineage enablement for DAG while "
"[openlineage] selective_enable is on.",
dag_run.dag_id,
)
return

if not self.executor:
self.log.debug("Executor have not started before `on_dag_run_failed`")
return

if IS_AIRFLOW_2_10_OR_HIGHER:
task_ids = DagRun._get_partial_task_ids(dag_run.dag)
else:
task_ids = dag_run.dag.task_ids if dag_run.dag and dag_run.dag.partial else None
self.submit_callable(
self.adapter.dag_failed,
dag_id=dag_run.dag_id,
run_id=dag_run.run_id,
end_date=dag_run.end_date,
logical_date=dag_run.logical_date,
dag_run_state=dag_run.get_state(),
task_ids=task_ids,
msg=msg,
)
return
except BaseException as e:
self.log.warning("OpenLineage received exception in method on_dag_run_failed", exc_info=e)

if not self.executor:
self.log.debug("Executor have not started before `on_dag_run_failed`")
return
def submit_callable(self, callable, *args, **kwargs):
fut = self.executor.submit(callable, *args, **kwargs)
fut.add_done_callback(self.log_submit_error)
return fut

self.executor.submit(self.adapter.dag_failed, dag_run=dag_run, msg=msg)
def log_submit_error(self, fut):
if fut.exception():
self.log.warning("Failed to submit method to executor", exc_info=fut.exception())
else:
self.log.debug("Successfully submitted method to executor")


def get_openlineage_listener() -> OpenLineageListener:
Expand Down
16 changes: 9 additions & 7 deletions airflow/providers/openlineage/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from airflow import __version__ as AIRFLOW_VERSION
from airflow.datasets import Dataset
from airflow.exceptions import AirflowProviderDeprecationWarning # TODO: move this maybe to Airflow's logic?
from airflow.models import DAG, BaseOperator, MappedOperator
from airflow.models import DAG, BaseOperator, DagRun, MappedOperator
from airflow.providers.openlineage import conf
from airflow.providers.openlineage.plugins.facets import (
AirflowDagRunFacet,
Expand All @@ -58,9 +58,8 @@
from openlineage.client.event_v2 import Dataset as OpenLineageDataset
from openlineage.client.facet_v2 import RunFacet

from airflow.models import DagRun, TaskInstance
from airflow.utils.state import TaskInstanceState

from airflow.models import TaskInstance
from airflow.utils.state import DagRunState, TaskInstanceState

log = logging.getLogger(__name__)
_NOMINAL_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
Expand Down Expand Up @@ -439,11 +438,14 @@ def get_airflow_job_facet(dag_run: DagRun) -> dict[str, AirflowJobFacet]:
}


def get_airflow_state_run_facet(dag_run: DagRun) -> dict[str, AirflowStateRunFacet]:
def get_airflow_state_run_facet(
dag_id: str, run_id: str, task_ids: list[str], dag_run_state: DagRunState
) -> dict[str, AirflowStateRunFacet]:
tis = DagRun.fetch_task_instances(dag_id=dag_id, run_id=run_id, task_ids=task_ids)
return {
"airflowState": AirflowStateRunFacet(
dagRunState=dag_run.get_state(),
tasksState={ti.task_id: ti.state for ti in dag_run.get_task_instances()},
dagRunState=dag_run_state,
tasksState={ti.task_id: ti.state for ti in tis},
)
}

Expand Down
Loading

0 comments on commit c55a7ab

Please sign in to comment.