Skip to content

Commit

Permalink
Metric for raw task return codes (#27155)
Browse files Browse the repository at this point in the history
Co-authored-by: Igor Kholopov <kholopovus@gmail.com>
  • Loading branch information
IKholopov and Igor Kholopov authored Nov 22, 2022
1 parent 0cf3cb1 commit 2aaccaf
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 0 deletions.
6 changes: 6 additions & 0 deletions airflow/jobs/local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ def handle_task_exit(self, return_code: int) -> None:
# Without setting this, heartbeat may get us
self.terminating = True
self.log.info("Task exited with return code %s", return_code)
self._log_return_code_metric(return_code)

if not self.task_instance.test_mode:
if conf.getboolean("scheduler", "schedule_after_task_execution", fallback=True):
Expand Down Expand Up @@ -225,6 +226,11 @@ def heartbeat_callback(self, session=None):
self.terminating = True
self._state_change_checks += 1

def _log_return_code_metric(self, return_code: int):
Stats.incr(
f"local_task_job.task_exit.{self.id}.{self.dag_id}.{self.task_instance.task_id}.{return_code}"
)

@staticmethod
def _enable_task_listeners():
"""
Expand Down
22 changes: 22 additions & 0 deletions tests/jobs/test_local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,28 @@ def test_localtaskjob_double_trigger(self):

session.close()

@patch.object(StandardTaskRunner, "return_code")
@mock.patch("airflow.jobs.scheduler_job.Stats.incr", autospec=True)
def test_local_task_return_code_metric(self, mock_stats_incr, mock_return_code, create_dummy_dag):

_, task = create_dummy_dag("test_localtaskjob_code")

ti_run = TaskInstance(task=task, execution_date=DEFAULT_DATE)
ti_run.refresh_from_db()
job1 = LocalTaskJob(task_instance=ti_run, executor=SequentialExecutor())
job1.id = 95

mock_return_code.side_effect = [None, -9, None]

with timeout(10):
job1.run()

mock_stats_incr.assert_has_calls(
[
mock.call("local_task_job.task_exit.95.test_localtaskjob_code.op1.-9"),
]
)

@pytest.mark.quarantined
@patch.object(StandardTaskRunner, "return_code")
def test_localtaskjob_maintain_heart_rate(self, mock_return_code, caplog, create_dummy_dag):
Expand Down

0 comments on commit 2aaccaf

Please sign in to comment.