Skip to content

Commit

Permalink
Resolve backfill job deprecations in tests (apache#39961)
Browse files Browse the repository at this point in the history
  • Loading branch information
dirrao authored and romsharon98 committed Jul 26, 2024
1 parent 83ca0bd commit 5ab064b
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 39 deletions.
11 changes: 0 additions & 11 deletions tests/deprecations_ignore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,6 @@
- tests/core/test_stats.py::TestStats::test_load_allow_list_validator
- tests/core/test_stats.py::TestStats::test_load_block_list_validator
- tests/core/test_stats.py::TestStats::test_load_custom_statsd_client
- tests/jobs/test_backfill_job.py::TestBackfillJob::test_backfill_depends_on_past_backwards
- tests/jobs/test_backfill_job.py::TestBackfillJob::test_backfill_depends_on_past_works_independently_on_ignore_depends_on_past
- tests/jobs/test_backfill_job.py::TestBackfillJob::test_backfill_pooled_tasks
- tests/jobs/test_backfill_job.py::TestBackfillJob::test_backfill_rerun_failed_tasks
- tests/jobs/test_backfill_job.py::TestBackfillJob::test_backfill_rerun_failed_tasks_without_flag
- tests/jobs/test_backfill_job.py::TestBackfillJob::test_backfill_rerun_upstream_failed_tasks
- tests/jobs/test_backfill_job.py::TestBackfillJob::test_backfill_run_rescheduled
- tests/jobs/test_backfill_job.py::TestBackfillJob::test_backfill_skip_active_scheduled_dagrun
- tests/jobs/test_backfill_job.py::TestBackfillJob::test_reset_orphaned_tasks_with_orphans
- tests/jobs/test_backfill_job.py::TestBackfillJob::test_subdag_clear_parentdag_downstream_clear
- tests/jobs/test_backfill_job.py::TestBackfillJob::test_update_counters
- tests/models/test_cleartasks.py::TestClearTasks::test_dags_clear
- tests/models/test_dag.py::TestDag::test_bulk_write_to_db_interval_save_runtime
- tests/models/test_dag.py::TestDag::test_bulk_write_to_db_max_active_runs
Expand Down
56 changes: 28 additions & 28 deletions tests/jobs/test_backfill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
logger = logging.getLogger(__name__)

DEFAULT_DATE = timezone.datetime(2016, 1, 1)
DEFAULT_DAG_RUN_ID = "test1"


@pytest.fixture(scope="module")
Expand Down Expand Up @@ -676,7 +677,7 @@ def test_backfill_run_rescheduled(self, dag_maker):
dag = self._get_dummy_dag(
dag_maker, dag_id="test_backfill_run_rescheduled", task_id="test_backfill_run_rescheduled_task-1"
)
dag_maker.create_dagrun(state=None)
dag_maker.create_dagrun(state=None, run_id=DEFAULT_DAG_RUN_ID)

executor = MockExecutor()

Expand All @@ -689,7 +690,7 @@ def test_backfill_run_rescheduled(self, dag_maker):
)
run_job(job=job, execute_callable=job_runner._execute)

ti = TI(task=dag.get_task("test_backfill_run_rescheduled_task-1"), execution_date=DEFAULT_DATE)
ti = TI(task=dag.get_task("test_backfill_run_rescheduled_task-1"), run_id=DEFAULT_DAG_RUN_ID)
ti.refresh_from_db()
ti.set_state(State.UP_FOR_RESCHEDULE)

Expand All @@ -702,7 +703,7 @@ def test_backfill_run_rescheduled(self, dag_maker):
rerun_failed_tasks=True,
)
run_job(job=job, execute_callable=job_runner._execute)
ti = TI(task=dag.get_task("test_backfill_run_rescheduled_task-1"), execution_date=DEFAULT_DATE)
ti = TI(task=dag.get_task("test_backfill_run_rescheduled_task-1"), run_id=DEFAULT_DAG_RUN_ID)
ti.refresh_from_db()
assert ti.state == State.SUCCESS

Expand Down Expand Up @@ -741,10 +742,7 @@ def test_backfill_skip_active_scheduled_dagrun(self, dag_maker, caplog):
dag_id="test_backfill_skip_active_scheduled_dagrun",
task_id="test_backfill_skip_active_scheduled_dagrun-1",
)
dag_maker.create_dagrun(
run_type=DagRunType.SCHEDULED,
state=State.RUNNING,
)
dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, state=State.RUNNING, run_id=DEFAULT_DAG_RUN_ID)

executor = MockExecutor()

Expand All @@ -760,9 +758,7 @@ def test_backfill_skip_active_scheduled_dagrun(self, dag_maker, caplog):
run_job(job=job, execute_callable=job_runner._execute)
assert "Backfill cannot be created for DagRun" in caplog.messages[0]

ti = TI(
task=dag.get_task("test_backfill_skip_active_scheduled_dagrun-1"), execution_date=DEFAULT_DATE
)
ti = TI(task=dag.get_task("test_backfill_skip_active_scheduled_dagrun-1"), run_id=DEFAULT_DAG_RUN_ID)
ti.refresh_from_db()
# since DAG backfill is skipped, task state should be none
assert ti.state == State.NONE
Expand All @@ -771,7 +767,7 @@ def test_backfill_rerun_failed_tasks(self, dag_maker):
dag = self._get_dummy_dag(
dag_maker, dag_id="test_backfill_rerun_failed", task_id="test_backfill_rerun_failed_task-1"
)
dag_maker.create_dagrun(state=None)
dag_maker.create_dagrun(state=None, run_id=DEFAULT_DAG_RUN_ID)

executor = MockExecutor()

Expand All @@ -784,7 +780,7 @@ def test_backfill_rerun_failed_tasks(self, dag_maker):
)
run_job(job=job, execute_callable=job_runner._execute)

ti = TI(task=dag.get_task("test_backfill_rerun_failed_task-1"), execution_date=DEFAULT_DATE)
ti = TI(task=dag.get_task("test_backfill_rerun_failed_task-1"), run_id=DEFAULT_DAG_RUN_ID)
ti.refresh_from_db()
ti.set_state(State.FAILED)

Expand All @@ -797,7 +793,7 @@ def test_backfill_rerun_failed_tasks(self, dag_maker):
rerun_failed_tasks=True,
)
run_job(job=job, execute_callable=job_runner._execute)
ti = TI(task=dag.get_task("test_backfill_rerun_failed_task-1"), execution_date=DEFAULT_DATE)
ti = TI(task=dag.get_task("test_backfill_rerun_failed_task-1"), run_id=DEFAULT_DAG_RUN_ID)
ti.refresh_from_db()
assert ti.state == State.SUCCESS

Expand All @@ -806,7 +802,7 @@ def test_backfill_rerun_upstream_failed_tasks(self, dag_maker):
op1 = EmptyOperator(task_id="test_backfill_rerun_upstream_failed_task-1")
op2 = EmptyOperator(task_id="test_backfill_rerun_upstream_failed_task-2")
op1.set_upstream(op2)
dag_maker.create_dagrun(state=None)
dag_maker.create_dagrun(state=None, run_id=DEFAULT_DAG_RUN_ID)

executor = MockExecutor()

Expand All @@ -819,7 +815,7 @@ def test_backfill_rerun_upstream_failed_tasks(self, dag_maker):
)
run_job(job=job, execute_callable=job_runner._execute)

ti = TI(task=dag.get_task("test_backfill_rerun_upstream_failed_task-1"), execution_date=DEFAULT_DATE)
ti = TI(task=dag.get_task("test_backfill_rerun_upstream_failed_task-1"), run_id=DEFAULT_DAG_RUN_ID)
ti.refresh_from_db()
ti.set_state(State.UPSTREAM_FAILED)

Expand All @@ -832,15 +828,15 @@ def test_backfill_rerun_upstream_failed_tasks(self, dag_maker):
rerun_failed_tasks=True,
)
run_job(job=job, execute_callable=job_runner._execute)
ti = TI(task=dag.get_task("test_backfill_rerun_upstream_failed_task-1"), execution_date=DEFAULT_DATE)
ti = TI(task=dag.get_task("test_backfill_rerun_upstream_failed_task-1"), run_id=DEFAULT_DAG_RUN_ID)
ti.refresh_from_db()
assert ti.state == State.SUCCESS

def test_backfill_rerun_failed_tasks_without_flag(self, dag_maker):
dag = self._get_dummy_dag(
dag_maker, dag_id="test_backfill_rerun_failed", task_id="test_backfill_rerun_failed_task-1"
)
dag_maker.create_dagrun(state=None)
dag_maker.create_dagrun(state=None, run_id=DEFAULT_DAG_RUN_ID)

executor = MockExecutor()

Expand All @@ -853,7 +849,7 @@ def test_backfill_rerun_failed_tasks_without_flag(self, dag_maker):
)
run_job(job=job, execute_callable=job_runner._execute)

ti = TI(task=dag.get_task("test_backfill_rerun_failed_task-1"), execution_date=DEFAULT_DATE)
ti = TI(task=dag.get_task("test_backfill_rerun_failed_task-1"), run_id=DEFAULT_DAG_RUN_ID)
ti.refresh_from_db()
ti.set_state(State.FAILED)

Expand Down Expand Up @@ -1012,7 +1008,8 @@ def test_backfill_pooled_tasks(self):
run_job(job=job, execute_callable=job_runner._execute)
except AirflowTaskTimeout:
pass
ti = TI(task=dag.get_task("test_backfill_pooled_task"), execution_date=DEFAULT_DATE)
run_id = f"backfill__{DEFAULT_DATE.isoformat()}"
ti = TI(task=dag.get_task("test_backfill_pooled_task"), run_id=run_id)
ti.refresh_from_db()
assert ti.state == State.SUCCESS

Expand All @@ -1034,8 +1031,9 @@ def test_backfill_depends_on_past_works_independently_on_ignore_depends_on_past(
)
run_job(job=job, execute_callable=job_runner._execute)

run_id = f"backfill__{run_date.isoformat()}"
# ti should have succeeded
ti = TI(dag.tasks[0], run_date)
ti = TI(dag.tasks[0], run_id=run_id)
ti.refresh_from_db()
assert ti.state == State.SUCCESS

Expand All @@ -1058,7 +1056,8 @@ def test_backfill_depends_on_past_backwards(self):
job_runner = BackfillJobRunner(job=job, dag=dag, ignore_first_depends_on_past=True, **kwargs)
run_job(job=job, execute_callable=job_runner._execute)

ti = TI(dag.get_task("test_dop_task"), end_date)
run_id = f"backfill__{end_date.isoformat()}"
ti = TI(dag.get_task("test_dop_task"), run_id=run_id)
ti.refresh_from_db()
# runs fine forwards
assert ti.state == State.SUCCESS
Expand Down Expand Up @@ -1441,15 +1440,16 @@ def test_subdag_clear_parentdag_downstream_clear(self):
with timeout(seconds=30):
run_job(job=job, execute_callable=job_runner._execute)

ti_subdag = TI(task=dag.get_task("daily_job"), execution_date=DEFAULT_DATE)
run_id = f"backfill__{DEFAULT_DATE.isoformat()}"
ti_subdag = TI(task=dag.get_task("daily_job"), run_id=run_id)
ti_subdag.refresh_from_db()
assert ti_subdag.state == State.SUCCESS

ti_irrelevant = TI(task=dag.get_task("daily_job_irrelevant"), execution_date=DEFAULT_DATE)
ti_irrelevant = TI(task=dag.get_task("daily_job_irrelevant"), run_id=run_id)
ti_irrelevant.refresh_from_db()
assert ti_irrelevant.state == State.SUCCESS

ti_downstream = TI(task=dag.get_task("daily_job_downstream"), execution_date=DEFAULT_DATE)
ti_downstream = TI(task=dag.get_task("daily_job_downstream"), run_id=run_id)
ti_downstream.refresh_from_db()
assert ti_downstream.state == State.SUCCESS

Expand Down Expand Up @@ -1530,7 +1530,7 @@ def test_update_counters(self, dag_maker, session):
dr = dag_maker.create_dagrun(state=None)
job = Job()
job_runner = BackfillJobRunner(job=job, dag=dag)
ti = TI(task1, dr.execution_date)
ti = TI(task1, run_id=dr.run_id)
ti.refresh_from_db()

ti_status = BackfillJobRunner._DagRunTaskStatus()
Expand Down Expand Up @@ -1742,15 +1742,15 @@ def test_reset_orphaned_tasks_with_orphans(self, dag_maker):
job = Job()
job_runner = BackfillJobRunner(job=job, dag=dag)
# create dagruns
dr1 = dag_maker.create_dagrun(state=State.RUNNING)
dr1 = dag_maker.create_dagrun(run_id=DEFAULT_DAG_RUN_ID, state=State.RUNNING)
dr2 = dag.create_dagrun(run_id="test2", state=State.SUCCESS)

# create taskinstances and set states
dr1_tis = []
dr2_tis = []
for task, state in zip(tasks, states):
ti1 = TI(task, dr1.execution_date)
ti2 = TI(task, dr2.execution_date)
ti1 = TI(task, run_id=dr1.run_id)
ti2 = TI(task, run_id=dr2.run_id)
ti1.refresh_from_db()
ti2.refresh_from_db()
ti1.state = state
Expand Down

0 comments on commit 5ab064b

Please sign in to comment.