From 093345cb61cea623bffe6af5a41d6ba417dfcf6f Mon Sep 17 00:00:00 2001 From: blag Date: Mon, 21 Nov 2022 18:10:47 -0800 Subject: [PATCH] Switch (back) to late imports (#27730) --- airflow/models/dag.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 1b9dbfe326fbd..46f635a40652d 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -77,7 +77,6 @@ from airflow.models.dagcode import DagCode from airflow.models.dagpickle import DagPickle from airflow.models.dagrun import DagRun -from airflow.models.dataset import DagScheduleDatasetReference, DatasetDagRunQueue as DDRQ, DatasetModel from airflow.models.operator import Operator from airflow.models.param import DagParam, ParamsDict from airflow.models.taskinstance import Context, TaskInstance, TaskInstanceKey, clear_task_instances @@ -207,6 +206,8 @@ def get_dataset_triggered_next_run_info( Given a list of dag_ids, get string representing how close any that are dataset triggered are their next run, e.g. "1 of 2 datasets updated" """ + from airflow.models.dataset import DagScheduleDatasetReference, DatasetDagRunQueue as DDRQ, DatasetModel + return { x.dag_id: { "uri": x.uri, @@ -3340,6 +3341,8 @@ def dags_needing_dagruns(cls, session: Session) -> tuple[Query, dict[str, tuple[ you should ensure that any scheduling decisions are made in a single transaction -- as soon as the transaction is committed it will be unlocked. """ + from airflow.models.dataset import DagScheduleDatasetReference, DatasetDagRunQueue as DDRQ + # these dag ids are triggered by datasets, and they are ready to go. dataset_triggered_dag_info = { x.dag_id: (x.first_queued_time, x.last_queued_time)