Skip to content

Commit

Permalink
Fix failing dagrun query due to sqlalchemy 2.0 refactor (#32801)
Browse files Browse the repository at this point in the history
  • Loading branch information
ephraimbuddy authored Jul 24, 2023
1 parent 73bc49a commit 0339f94
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 4 deletions.
7 changes: 3 additions & 4 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2316,9 +2316,8 @@ def clear(self, *, session: Session = NEW_SESSION):

# Lock the related dag runs to prevent from possible dead lock.
# https://github.com/apache/airflow/pull/26658
dag_runs_query = session.scalars(
select(DagRun.id).where(DagRun.dag_id == dag_id).with_for_update()
)
dag_runs_query = select(DagRun.id).where(DagRun.dag_id == dag_id).with_for_update()

if start_date is None and end_date is None:
dag_runs_query = dag_runs_query.where(DagRun.execution_date == start_date)
else:
Expand All @@ -2328,7 +2327,7 @@ def clear(self, *, session: Session = NEW_SESSION):
if end_date is not None:
dag_runs_query = dag_runs_query.where(DagRun.execution_date <= end_date)

locked_dag_run_ids = dag_runs_query.all()
locked_dag_run_ids = session.scalars(dag_runs_query).all()
elif task_id:
if map_indexes is None:
task_ids = [task_id]
Expand Down
23 changes: 23 additions & 0 deletions tests/www/views/test_views_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ def init_dagruns(app, reset_dagruns):
start_date=timezone.utcnow(),
state=State.RUNNING,
)
app.dag_bag.get_dag("example_task_group").create_dagrun(
run_id=DEFAULT_DAGRUN,
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
data_interval=(DEFAULT_DATE, DEFAULT_DATE),
start_date=timezone.utcnow(),
state=State.RUNNING,
)
yield
clear_db_runs()

Expand Down Expand Up @@ -506,12 +514,27 @@ def test_code_from_db_all_example_dags(admin_client):
),
"example_bash_operator",
),
(
"clear",
dict(
group_id="section_1",
dag_id="example_task_group",
execution_date=DEFAULT_DATE,
upstream="false",
downstream="false",
future="false",
past="false",
only_failed="false",
),
"example_task_group",
),
],
ids=[
"paused",
"failed-flash-hint",
"success-flash-hint",
"clear",
"clear-task-group",
],
)
def test_views_post(admin_client, url, data, content):
Expand Down

0 comments on commit 0339f94

Please sign in to comment.