From 09af70307d5087e7e040a9432ec1f3c81bee980a Mon Sep 17 00:00:00 2001 From: Tiago Deane Date: Mon, 25 Mar 2024 14:47:10 +0000 Subject: [PATCH 1/3] Fix #35622: TaskHandlerWithCustomFormatter now adds prefix only once When using the TaskHandlerWithCustomFormatter to add a prefix to logs, it was previously adding the prefix multiple times. This happened because it was being called multiple times from logging_mixin.py, and worsened because even when the handler's formatter was a TimezoneAware formatter (to include UTC offset), it was still adding an additional prefix. Because of this, I felt that any solution outside of the TaskHandlerWithCustomFormatter itself would either require a restructuring of the handlers' structure or slow down execution for all other handlers. And so, the solution I settled on was to add to TaskHandlerWithCustomFormatter's initial 'if' statement a simple 'or self.prefix_jinja_template is not None', so that it returns if the prefix had already been set. This is similar to what is done by the ElasticSearch es_task_handler.py. Note: also fixed the documentation's example for the handler, as the previous one was incorrect and didn't work. --- airflow/config_templates/config.yml | 2 +- airflow/utils/log/task_handler_with_custom_formatter.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 60a9d3c7dc6b7..62eb79336e704 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -848,7 +848,7 @@ logging: Specify prefix pattern like mentioned below with stream handler TaskHandlerWithCustomFormatter version_added: 2.0.0 type: string - example: "{ti.dag_id}-{ti.task_id}-{execution_date}-{try_number}" + example: "{{ti.dag_id}}-{{ti.task_id}}-{{execution_date}}-{{ti.try_number}}" is_template: true default: "" log_filename_template: diff --git a/airflow/utils/log/task_handler_with_custom_formatter.py b/airflow/utils/log/task_handler_with_custom_formatter.py index 3fdda914c5d7e..d8af195f6dd0c 100644 --- a/airflow/utils/log/task_handler_with_custom_formatter.py +++ b/airflow/utils/log/task_handler_with_custom_formatter.py @@ -45,7 +45,7 @@ def set_context(self, ti) -> None: :param ti: :return: """ - if ti.raw or self.formatter is None: + if ti.raw or self.formatter is None or self.prefix_jinja_template is not None: return prefix = conf.get("logging", "task_log_prefix_template") From 64f0a34048eabcd82a2b4b2b7b5702dcacd98fec Mon Sep 17 00:00:00 2001 From: Tiago Deane Date: Wed, 27 Mar 2024 17:02:07 +0000 Subject: [PATCH 2/3] Added comment on TaskHandlerWithCustomFormatter --- airflow/utils/log/task_handler_with_custom_formatter.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/utils/log/task_handler_with_custom_formatter.py b/airflow/utils/log/task_handler_with_custom_formatter.py index d8af195f6dd0c..5cb71c4281fc6 100644 --- a/airflow/utils/log/task_handler_with_custom_formatter.py +++ b/airflow/utils/log/task_handler_with_custom_formatter.py @@ -45,6 +45,7 @@ def set_context(self, ti) -> None: :param ti: :return: """ + # Returns if there is no formatter or if the prefix has already been set if ti.raw or self.formatter is None or self.prefix_jinja_template is not None: return prefix = conf.get("logging", "task_log_prefix_template") From cbec41ae624b02fe4f2a36c6f07d25d8afdc695c Mon Sep 17 00:00:00 2001 From: Tiago Deane Date: Wed, 27 Mar 2024 17:03:17 +0000 Subject: [PATCH 3/3] test: expanded test_task_handler_with_custom_formatter.py to try adding the prefix multiple times --- ...test_task_handler_with_custom_formatter.py | 24 +++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/tests/utils/test_task_handler_with_custom_formatter.py b/tests/utils/test_task_handler_with_custom_formatter.py index 6f53ecfd31e59..c6c6565f54fca 100644 --- a/tests/utils/test_task_handler_with_custom_formatter.py +++ b/tests/utils/test_task_handler_with_custom_formatter.py @@ -74,20 +74,36 @@ def task_instance(): clear_db_runs() -def assert_prefix(task_instance: TaskInstance, prefix: str) -> None: +def assert_prefix_once(task_instance: TaskInstance, prefix: str) -> None: handler = next((h for h in task_instance.log.handlers if h.name == TASK_HANDLER), None) assert handler is not None, "custom task log handler not set up correctly" assert handler.formatter is not None, "custom task log formatter not set up correctly" + previous_formatter = handler.formatter expected_format = f"{prefix}:{handler.formatter._fmt}" set_context(task_instance.log, task_instance) assert expected_format == handler.formatter._fmt + handler.setFormatter(previous_formatter) + + +def assert_prefix_multiple(task_instance: TaskInstance, prefix: str) -> None: + handler = next((h for h in task_instance.log.handlers if h.name == TASK_HANDLER), None) + assert handler is not None, "custom task log handler not set up correctly" + assert handler.formatter is not None, "custom task log formatter not set up correctly" + previous_formatter = handler.formatter + expected_format = f"{prefix}:{handler.formatter._fmt}" + set_context(task_instance.log, task_instance) + set_context(task_instance.log, task_instance) + set_context(task_instance.log, task_instance) + assert expected_format == handler.formatter._fmt + handler.setFormatter(previous_formatter) def test_custom_formatter_default_format(task_instance): """The default format provides no prefix.""" - assert_prefix(task_instance, "") + assert_prefix_once(task_instance, "") -@conf_vars({("logging", "task_log_prefix_template"): "{{ti.dag_id }}-{{ ti.task_id }}"}) +@conf_vars({("logging", "task_log_prefix_template"): "{{ ti.dag_id }}-{{ ti.task_id }}"}) def test_custom_formatter_custom_format_not_affected_by_config(task_instance): - assert_prefix(task_instance, f"{DAG_ID}-{TASK_ID}") + """Certifies that the prefix is only added once, even after repeated calls""" + assert_prefix_multiple(task_instance, f"{DAG_ID}-{TASK_ID}")