From 0414443b772e7a812a5533bd9ecce950016f5ca4 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 9 Oct 2024 18:24:38 -0700 Subject: [PATCH] feat(airflow): add a `render_templates` config parameter (#11537) --- docs/lineage/airflow.md | 1 + .../src/datahub_airflow_plugin/_config.py | 19 +++++++++++++------ .../datahub_listener.py | 6 ++++-- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/docs/lineage/airflow.md b/docs/lineage/airflow.md index 65da1fd5251dc9..aca6d30619ea8c 100644 --- a/docs/lineage/airflow.md +++ b/docs/lineage/airflow.md @@ -141,6 +141,7 @@ conn_id = datahub_rest_default # or datahub_kafka_default | capture_tags_info | true | If true, the tags field of the DAG will be captured as DataHub tags. | | capture_executions | true | If true, we'll capture task runs in DataHub in addition to DAG definitions. | | materialize_iolets | true | Create or un-soft-delete all entities referenced in lineage. | +| render_templates | true | If true, jinja-templated fields will be automatically rendered to improve the accuracy of SQL statement extraction. | | datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid. | | | | graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. | diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py index c37a1b334ed377..8deba22a107ce7 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py @@ -43,19 +43,24 @@ class DatahubLineageConfig(ConfigModel): capture_executions: bool = False + datajob_url_link: DatajobUrl = DatajobUrl.TASKINSTANCE + + # Note that this field is only respected by the lineage backend. + # The Airflow plugin v2 behaves as if it were set to True. + graceful_exceptions: bool = True + + # The remaining config fields are only relevant for the v2 plugin. enable_extractors: bool = True + # If true, ti.render_templates() will be called in the listener. + # Makes extraction of jinja-templated fields more accurate. + render_templates: bool = True + log_level: Optional[str] = None debug_emitter: bool = False disable_openlineage_plugin: bool = True - # Note that this field is only respected by the lineage backend. - # The Airflow plugin behaves as if it were set to True. - graceful_exceptions: bool = True - - datajob_url_link: DatajobUrl = DatajobUrl.TASKINSTANCE - def make_emitter_hook(self) -> "DatahubGenericHook": # This is necessary to avoid issues with circular imports. from datahub_airflow_plugin.hooks.datahub import DatahubGenericHook @@ -84,6 +89,7 @@ def get_lineage_config() -> DatahubLineageConfig: disable_openlineage_plugin = conf.get( "datahub", "disable_openlineage_plugin", fallback=True ) + render_templates = conf.get("datahub", "render_templates", fallback=True) datajob_url_link = conf.get( "datahub", "datajob_url_link", fallback=DatajobUrl.TASKINSTANCE.value ) @@ -102,4 +108,5 @@ def get_lineage_config() -> DatahubLineageConfig: debug_emitter=debug_emitter, disable_openlineage_plugin=disable_openlineage_plugin, datajob_url_link=datajob_url_link, + render_templates=render_templates, ) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py index 123b74fee74b5d..b818b76de9f7f9 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py @@ -386,7 +386,8 @@ def on_task_instance_running( f"DataHub listener got notification about task instance start for {task_instance.task_id}" ) - task_instance = _render_templates(task_instance) + if self.config.render_templates: + task_instance = _render_templates(task_instance) # The type ignore is to placate mypy on Airflow 2.1.x. dagrun: "DagRun" = task_instance.dag_run # type: ignore[attr-defined] @@ -478,7 +479,8 @@ def on_task_instance_finish( ) -> None: dagrun: "DagRun" = task_instance.dag_run # type: ignore[attr-defined] - task_instance = _render_templates(task_instance) + if self.config.render_templates: + task_instance = _render_templates(task_instance) # We must prefer the task attribute, in case modifications to the task's inlets/outlets # were made by the execute() method.