diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 60a9d3c7dc6b7..62eb79336e704 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -848,7 +848,7 @@ logging: Specify prefix pattern like mentioned below with stream handler TaskHandlerWithCustomFormatter version_added: 2.0.0 type: string - example: "{ti.dag_id}-{ti.task_id}-{execution_date}-{try_number}" + example: "{{ti.dag_id}}-{{ti.task_id}}-{{execution_date}}-{{ti.try_number}}" is_template: true default: "" log_filename_template: diff --git a/airflow/utils/log/task_handler_with_custom_formatter.py b/airflow/utils/log/task_handler_with_custom_formatter.py index 3fdda914c5d7e..5cb71c4281fc6 100644 --- a/airflow/utils/log/task_handler_with_custom_formatter.py +++ b/airflow/utils/log/task_handler_with_custom_formatter.py @@ -45,7 +45,8 @@ def set_context(self, ti) -> None: :param ti: :return: """ - if ti.raw or self.formatter is None: + # Returns if there is no formatter or if the prefix has already been set + if ti.raw or self.formatter is None or self.prefix_jinja_template is not None: return prefix = conf.get("logging", "task_log_prefix_template") diff --git a/tests/utils/test_task_handler_with_custom_formatter.py b/tests/utils/test_task_handler_with_custom_formatter.py index 6f53ecfd31e59..c6c6565f54fca 100644 --- a/tests/utils/test_task_handler_with_custom_formatter.py +++ b/tests/utils/test_task_handler_with_custom_formatter.py @@ -74,20 +74,36 @@ def task_instance(): clear_db_runs() -def assert_prefix(task_instance: TaskInstance, prefix: str) -> None: +def assert_prefix_once(task_instance: TaskInstance, prefix: str) -> None: handler = next((h for h in task_instance.log.handlers if h.name == TASK_HANDLER), None) assert handler is not None, "custom task log handler not set up correctly" assert handler.formatter is not None, "custom task log formatter not set up correctly" + previous_formatter = handler.formatter expected_format = f"{prefix}:{handler.formatter._fmt}" set_context(task_instance.log, task_instance) assert expected_format == handler.formatter._fmt + handler.setFormatter(previous_formatter) + + +def assert_prefix_multiple(task_instance: TaskInstance, prefix: str) -> None: + handler = next((h for h in task_instance.log.handlers if h.name == TASK_HANDLER), None) + assert handler is not None, "custom task log handler not set up correctly" + assert handler.formatter is not None, "custom task log formatter not set up correctly" + previous_formatter = handler.formatter + expected_format = f"{prefix}:{handler.formatter._fmt}" + set_context(task_instance.log, task_instance) + set_context(task_instance.log, task_instance) + set_context(task_instance.log, task_instance) + assert expected_format == handler.formatter._fmt + handler.setFormatter(previous_formatter) def test_custom_formatter_default_format(task_instance): """The default format provides no prefix.""" - assert_prefix(task_instance, "") + assert_prefix_once(task_instance, "") -@conf_vars({("logging", "task_log_prefix_template"): "{{ti.dag_id }}-{{ ti.task_id }}"}) +@conf_vars({("logging", "task_log_prefix_template"): "{{ ti.dag_id }}-{{ ti.task_id }}"}) def test_custom_formatter_custom_format_not_affected_by_config(task_instance): - assert_prefix(task_instance, f"{DAG_ID}-{TASK_ID}") + """Certifies that the prefix is only added once, even after repeated calls""" + assert_prefix_multiple(task_instance, f"{DAG_ID}-{TASK_ID}")