From 2aaccafe9cf948074d1e56cfc271ee9c3354c732 Mon Sep 17 00:00:00 2001 From: Igor Kholopov Date: Tue, 22 Nov 2022 01:03:10 +0100 Subject: [PATCH] Metric for raw task return codes (#27155) Co-authored-by: Igor Kholopov --- airflow/jobs/local_task_job.py | 6 ++++++ tests/jobs/test_local_task_job.py | 22 ++++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py index e9e47f3c0e7d0..fe7fc4a5613a7 100644 --- a/airflow/jobs/local_task_job.py +++ b/airflow/jobs/local_task_job.py @@ -157,6 +157,7 @@ def handle_task_exit(self, return_code: int) -> None: # Without setting this, heartbeat may get us self.terminating = True self.log.info("Task exited with return code %s", return_code) + self._log_return_code_metric(return_code) if not self.task_instance.test_mode: if conf.getboolean("scheduler", "schedule_after_task_execution", fallback=True): @@ -225,6 +226,11 @@ def heartbeat_callback(self, session=None): self.terminating = True self._state_change_checks += 1 + def _log_return_code_metric(self, return_code: int): + Stats.incr( + f"local_task_job.task_exit.{self.id}.{self.dag_id}.{self.task_instance.task_id}.{return_code}" + ) + @staticmethod def _enable_task_listeners(): """ diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py index 40eab105d4b6f..ecfb750fd12f8 100644 --- a/tests/jobs/test_local_task_job.py +++ b/tests/jobs/test_local_task_job.py @@ -374,6 +374,28 @@ def test_localtaskjob_double_trigger(self): session.close() + @patch.object(StandardTaskRunner, "return_code") + @mock.patch("airflow.jobs.scheduler_job.Stats.incr", autospec=True) + def test_local_task_return_code_metric(self, mock_stats_incr, mock_return_code, create_dummy_dag): + + _, task = create_dummy_dag("test_localtaskjob_code") + + ti_run = TaskInstance(task=task, execution_date=DEFAULT_DATE) + ti_run.refresh_from_db() + job1 = LocalTaskJob(task_instance=ti_run, executor=SequentialExecutor()) + job1.id = 95 + + mock_return_code.side_effect = [None, -9, None] + + with timeout(10): + job1.run() + + mock_stats_incr.assert_has_calls( + [ + mock.call("local_task_job.task_exit.95.test_localtaskjob_code.op1.-9"), + ] + ) + @pytest.mark.quarantined @patch.object(StandardTaskRunner, "return_code") def test_localtaskjob_maintain_heart_rate(self, mock_return_code, caplog, create_dummy_dag):