From 145b66c006c6859f5c5b4a0ccd3ca566307c2253 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Sat, 23 Mar 2024 15:06:42 +0530 Subject: [PATCH 1/4] Make elastic search index_pattern more configurable --- .../elasticsearch/log/es_task_handler.py | 37 ++++++++++++++++--- airflow/providers/elasticsearch/provider.yaml | 9 +++++ .../elasticsearch/log/test_es_task_handler.py | 33 +++++++++++++++++ 3 files changed, 74 insertions(+), 5 deletions(-) diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py index 82cc887553d65..b28167636849e 100644 --- a/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/airflow/providers/elasticsearch/log/es_task_handler.py @@ -18,6 +18,7 @@ from __future__ import annotations import contextlib +import importlib import inspect import logging import sys @@ -114,6 +115,26 @@ def _ensure_ti(ti: TaskInstanceKey | TaskInstance, session) -> TaskInstance: raise AirflowException(f"Could not find TaskInstance for {ti}") +def _get_index_patterns( + index_patterns_callable: str | None = None, ti: TaskInstance | None = None +) -> str | None: + """ + Get index patterns by calling index_patterns_callable or None. + + :param index_patterns_callable: A string representing the full path to the callable itself. + :param ti: A TaskInstance object or None. + """ + if not index_patterns_callable: + return None + try: + module_path, index_pattern_function = index_patterns_callable.rsplit(".", 1) + module = importlib.import_module(module_path) + index_pattern_callable_obj = getattr(module, index_pattern_function) + return index_pattern_callable_obj(ti) + except Exception: + return None + + class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMixin): """ ElasticsearchTaskHandler is a python log handler that reads logs from Elasticsearch. @@ -153,6 +174,7 @@ def __init__( host: str = "http://localhost:9200", frontend: str = "localhost:5601", index_patterns: str | None = conf.get("elasticsearch", "index_patterns", fallback="_all"), + index_patterns_callable: str | None = None, es_kwargs: dict | None | Literal["default_es_kwargs"] = "default_es_kwargs", *, filename_template: str | None = None, @@ -184,6 +206,7 @@ def __init__( self.host_field = host_field self.offset_field = offset_field self.index_patterns = index_patterns + self.index_patterns_callable = index_patterns_callable self.context_set = False self.formatter: logging.Formatter @@ -302,7 +325,7 @@ def _read( offset = metadata["offset"] log_id = self._render_log_id(ti, try_number) - response = self._es_read(log_id, offset) + response = self._es_read(log_id, offset, ti) if response is not None and response.hits: logs_by_host = self._group_logs_by_host(response) next_offset = attrgetter(self.offset_field)(response[-1]) @@ -372,12 +395,15 @@ def _format_msg(self, hit: Hit): # Just a safe-guard to preserve backwards-compatibility return hit.message - def _es_read(self, log_id: str, offset: int | str) -> ElasticSearchResponse | None: + def _es_read( + self, log_id: str, offset: int | str, ti: TaskInstance | None = None + ) -> ElasticSearchResponse | None: """ Return the logs matching log_id in Elasticsearch and next offset or ''. :param log_id: the log_id of the log to read. :param offset: the offset start to read log from. + :param ti: the task instance object :meta private: """ @@ -388,16 +414,17 @@ def _es_read(self, log_id: str, offset: int | str) -> ElasticSearchResponse | No } } + index_patterns = _get_index_patterns(self.index_patterns_callable, ti) or self.index_patterns try: - max_log_line = self.client.count(index=self.index_patterns, query=query)["count"] # type: ignore + max_log_line = self.client.count(index=index_patterns, query=query)["count"] # type: ignore except NotFoundError as e: - self.log.exception("The target index pattern %s does not exist", self.index_patterns) + self.log.exception("The target index pattern %s does not exist", index_patterns) raise e if max_log_line != 0: try: res = self.client.search( - index=self.index_patterns, + index=index_patterns, query=query, sort=[self.offset_field], size=self.MAX_LINE_PER_PAGE, diff --git a/airflow/providers/elasticsearch/provider.yaml b/airflow/providers/elasticsearch/provider.yaml index 73d6ae2fac6f4..60ab0ea4926e7 100644 --- a/airflow/providers/elasticsearch/provider.yaml +++ b/airflow/providers/elasticsearch/provider.yaml @@ -160,10 +160,19 @@ config: index_patterns: description: | Comma separated list of index patterns to use when searching for logs (default: `_all`). + The index_patterns_callable takes precedence over this. version_added: 2.6.0 type: string example: something-* default: "_all" + index_patterns_callable: + description: | + A string representing the full path to the Python callable path which accept TI object and + return comma separated list of index patterns. This will takes precedence over index_patterns. + version_added: 2.9.0 + type: string + example: "module.callable" + default: "" elasticsearch_configs: description: ~ options: diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py b/tests/providers/elasticsearch/log/test_es_task_handler.py index da98482916158..d137462d084e8 100644 --- a/tests/providers/elasticsearch/log/test_es_task_handler.py +++ b/tests/providers/elasticsearch/log/test_es_task_handler.py @@ -17,6 +17,7 @@ # under the License. from __future__ import annotations +import importlib import json import logging import os @@ -25,6 +26,7 @@ from io import StringIO from pathlib import Path from unittest import mock +from unittest.mock import MagicMock from urllib.parse import quote import elasticsearch @@ -32,10 +34,12 @@ import pytest from airflow.configuration import conf +from airflow.models import TaskInstance from airflow.providers.elasticsearch.log.es_response import ElasticSearchResponse from airflow.providers.elasticsearch.log.es_task_handler import ( VALID_ES_CONFIG_KEYS, ElasticsearchTaskHandler, + _get_index_patterns, get_es_kwargs_from_config, getattr_nested, ) @@ -643,6 +647,35 @@ def test_dynamic_offset(self, stdout_mock, ti, time_machine): assert second_log["asctime"] == t2.format("YYYY-MM-DDTHH:mm:ss.SSSZZ") assert third_log["asctime"] == t3.format("YYYY-MM-DDTHH:mm:ss.SSSZZ") + def test_index_patterns_callable(self): + ti = MagicMock(spec=TaskInstance) + + def mock_callable(ti): + return "mocked_index_patterns" + + importlib.import_module = MagicMock() + importlib.import_module.return_value = MagicMock(**{"mock_callable": mock_callable}) + + result = _get_index_patterns("module_path.mock_callable", ti) + + assert result == "mocked_index_patterns" + + def test_index_patterns_none(self): + ti = MagicMock(spec=TaskInstance) + + result = _get_index_patterns(None, ti) + + assert result is None + + def test_index_patterns_exception(self): + ti = MagicMock(spec=TaskInstance) + + importlib.import_module = MagicMock(side_effect=Exception("Mocked exception")) + + result = _get_index_patterns("invalid_module_path.invalid_callable", ti) + + assert result is None + def test_safe_attrgetter(): class A: ... From 9da4cb9d1adfaf3424677cfab2efd8822eaee444 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Thu, 4 Apr 2024 19:06:36 +0530 Subject: [PATCH 2/4] Apply review suggestions --- .../elasticsearch/log/es_task_handler.py | 45 ++++++++----------- airflow/providers/elasticsearch/provider.yaml | 6 +-- .../elasticsearch/log/test_es_task_handler.py | 22 +-------- 3 files changed, 24 insertions(+), 49 deletions(-) diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py index b28167636849e..31306e5f24c5c 100644 --- a/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/airflow/providers/elasticsearch/log/es_task_handler.py @@ -115,26 +115,6 @@ def _ensure_ti(ti: TaskInstanceKey | TaskInstance, session) -> TaskInstance: raise AirflowException(f"Could not find TaskInstance for {ti}") -def _get_index_patterns( - index_patterns_callable: str | None = None, ti: TaskInstance | None = None -) -> str | None: - """ - Get index patterns by calling index_patterns_callable or None. - - :param index_patterns_callable: A string representing the full path to the callable itself. - :param ti: A TaskInstance object or None. - """ - if not index_patterns_callable: - return None - try: - module_path, index_pattern_function = index_patterns_callable.rsplit(".", 1) - module = importlib.import_module(module_path) - index_pattern_callable_obj = getattr(module, index_pattern_function) - return index_pattern_callable_obj(ti) - except Exception: - return None - - class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMixin): """ ElasticsearchTaskHandler is a python log handler that reads logs from Elasticsearch. @@ -173,8 +153,8 @@ def __init__( offset_field: str = "offset", host: str = "http://localhost:9200", frontend: str = "localhost:5601", - index_patterns: str | None = conf.get("elasticsearch", "index_patterns", fallback="_all"), - index_patterns_callable: str | None = None, + index_patterns: str = conf.get("elasticsearch", "index_patterns"), + index_patterns_callable: str = conf.get("elasticsearch", "index_patterns_callable", fallback=""), es_kwargs: dict | None | Literal["default_es_kwargs"] = "default_es_kwargs", *, filename_template: str | None = None, @@ -236,6 +216,21 @@ def format_url(host: str) -> str: return host + def _get_index_patterns(self, ti: TaskInstance | None) -> str: + """ + Get index patterns by calling index_patterns_callable, if provided, or the configured index_patterns. + + :param ti: A TaskInstance object or None. + """ + if self.index_patterns_callable: + self.log.debug("Using index_patterns_callable: %s", self.index_patterns_callable) + module_path, index_pattern_function = self.index_patterns_callable.rsplit(".", 1) + module = importlib.import_module(module_path) + index_pattern_callable_obj = getattr(module, index_pattern_function) + return index_pattern_callable_obj(ti) + self.log.debug("Using index_patterns: %s", self.index_patterns) + return self.index_patterns + def _render_log_id(self, ti: TaskInstance | TaskInstanceKey, try_number: int) -> str: from airflow.models.taskinstance import TaskInstanceKey @@ -395,9 +390,7 @@ def _format_msg(self, hit: Hit): # Just a safe-guard to preserve backwards-compatibility return hit.message - def _es_read( - self, log_id: str, offset: int | str, ti: TaskInstance | None = None - ) -> ElasticSearchResponse | None: + def _es_read(self, log_id: str, offset: int | str, ti: TaskInstance) -> ElasticSearchResponse | None: """ Return the logs matching log_id in Elasticsearch and next offset or ''. @@ -414,7 +407,7 @@ def _es_read( } } - index_patterns = _get_index_patterns(self.index_patterns_callable, ti) or self.index_patterns + index_patterns = self._get_index_patterns(ti) try: max_log_line = self.client.count(index=index_patterns, query=query)["count"] # type: ignore except NotFoundError as e: diff --git a/airflow/providers/elasticsearch/provider.yaml b/airflow/providers/elasticsearch/provider.yaml index 60ab0ea4926e7..c6a569fadeed2 100644 --- a/airflow/providers/elasticsearch/provider.yaml +++ b/airflow/providers/elasticsearch/provider.yaml @@ -169,10 +169,10 @@ config: description: | A string representing the full path to the Python callable path which accept TI object and return comma separated list of index patterns. This will takes precedence over index_patterns. - version_added: 2.9.0 + version_added: 5.4.0 type: string - example: "module.callable" - default: "" + example: module.callable + default: ~ elasticsearch_configs: description: ~ options: diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py b/tests/providers/elasticsearch/log/test_es_task_handler.py index d137462d084e8..1a75c3464c5b8 100644 --- a/tests/providers/elasticsearch/log/test_es_task_handler.py +++ b/tests/providers/elasticsearch/log/test_es_task_handler.py @@ -39,7 +39,6 @@ from airflow.providers.elasticsearch.log.es_task_handler import ( VALID_ES_CONFIG_KEYS, ElasticsearchTaskHandler, - _get_index_patterns, get_es_kwargs_from_config, getattr_nested, ) @@ -655,27 +654,10 @@ def mock_callable(ti): importlib.import_module = MagicMock() importlib.import_module.return_value = MagicMock(**{"mock_callable": mock_callable}) - - result = _get_index_patterns("module_path.mock_callable", ti) - + self.es_task_handler.index_patterns_callable = "module_path.mock_callable" + result = self.es_task_handler._get_index_patterns(ti) assert result == "mocked_index_patterns" - def test_index_patterns_none(self): - ti = MagicMock(spec=TaskInstance) - - result = _get_index_patterns(None, ti) - - assert result is None - - def test_index_patterns_exception(self): - ti = MagicMock(spec=TaskInstance) - - importlib.import_module = MagicMock(side_effect=Exception("Mocked exception")) - - result = _get_index_patterns("invalid_module_path.invalid_callable", ti) - - assert result is None - def test_safe_attrgetter(): class A: ... From 2cd528a28a9a3b5ab5e6e912382f7469876a2c74 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Sun, 21 Apr 2024 21:04:32 +0530 Subject: [PATCH 3/4] Fix default value --- airflow/providers/elasticsearch/provider.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/elasticsearch/provider.yaml b/airflow/providers/elasticsearch/provider.yaml index c6a569fadeed2..db252ef1888cc 100644 --- a/airflow/providers/elasticsearch/provider.yaml +++ b/airflow/providers/elasticsearch/provider.yaml @@ -172,7 +172,7 @@ config: version_added: 5.4.0 type: string example: module.callable - default: ~ + default: "" elasticsearch_configs: description: ~ options: From 957946f7e91f3f70409fd28eba5b92293764cb27 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Wed, 22 May 2024 16:10:50 +0530 Subject: [PATCH 4/4] Apply review suggetions --- .../elasticsearch/log/es_task_handler.py | 6 ++--- airflow/providers/elasticsearch/provider.yaml | 2 +- .../elasticsearch/log/test_es_task_handler.py | 23 ++++++++----------- 3 files changed, 13 insertions(+), 18 deletions(-) diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py index 31306e5f24c5c..c397e2b3585ff 100644 --- a/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/airflow/providers/elasticsearch/log/es_task_handler.py @@ -18,7 +18,6 @@ from __future__ import annotations import contextlib -import importlib import inspect import logging import sys @@ -42,6 +41,7 @@ from airflow.utils import timezone from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin +from airflow.utils.module_loading import import_string from airflow.utils.session import create_session if TYPE_CHECKING: @@ -224,9 +224,7 @@ def _get_index_patterns(self, ti: TaskInstance | None) -> str: """ if self.index_patterns_callable: self.log.debug("Using index_patterns_callable: %s", self.index_patterns_callable) - module_path, index_pattern_function = self.index_patterns_callable.rsplit(".", 1) - module = importlib.import_module(module_path) - index_pattern_callable_obj = getattr(module, index_pattern_function) + index_pattern_callable_obj = import_string(self.index_patterns_callable) return index_pattern_callable_obj(ti) self.log.debug("Using index_patterns: %s", self.index_patterns) return self.index_patterns diff --git a/airflow/providers/elasticsearch/provider.yaml b/airflow/providers/elasticsearch/provider.yaml index db252ef1888cc..46ae0530673dd 100644 --- a/airflow/providers/elasticsearch/provider.yaml +++ b/airflow/providers/elasticsearch/provider.yaml @@ -169,7 +169,7 @@ config: description: | A string representing the full path to the Python callable path which accept TI object and return comma separated list of index patterns. This will takes precedence over index_patterns. - version_added: 5.4.0 + version_added: 5.5.0 type: string example: module.callable default: "" diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py b/tests/providers/elasticsearch/log/test_es_task_handler.py index 1a75c3464c5b8..c920f0d466871 100644 --- a/tests/providers/elasticsearch/log/test_es_task_handler.py +++ b/tests/providers/elasticsearch/log/test_es_task_handler.py @@ -17,7 +17,6 @@ # under the License. from __future__ import annotations -import importlib import json import logging import os @@ -26,7 +25,7 @@ from io import StringIO from pathlib import Path from unittest import mock -from unittest.mock import MagicMock +from unittest.mock import Mock, patch from urllib.parse import quote import elasticsearch @@ -34,7 +33,6 @@ import pytest from airflow.configuration import conf -from airflow.models import TaskInstance from airflow.providers.elasticsearch.log.es_response import ElasticSearchResponse from airflow.providers.elasticsearch.log.es_task_handler import ( VALID_ES_CONFIG_KEYS, @@ -52,7 +50,6 @@ pytestmark = pytest.mark.db_test - AIRFLOW_SOURCES_ROOT_DIR = Path(__file__).parents[4].resolve() ES_PROVIDER_YAML_FILE = AIRFLOW_SOURCES_ROOT_DIR / "airflow" / "providers" / "elasticsearch" / "provider.yaml" @@ -646,17 +643,17 @@ def test_dynamic_offset(self, stdout_mock, ti, time_machine): assert second_log["asctime"] == t2.format("YYYY-MM-DDTHH:mm:ss.SSSZZ") assert third_log["asctime"] == t3.format("YYYY-MM-DDTHH:mm:ss.SSSZZ") - def test_index_patterns_callable(self): - ti = MagicMock(spec=TaskInstance) + def test_get_index_patterns_with_callable(self): + with patch("airflow.providers.elasticsearch.log.es_task_handler.import_string") as mock_import_string: + mock_callable = Mock(return_value="callable_index_pattern") + mock_import_string.return_value = mock_callable - def mock_callable(ti): - return "mocked_index_patterns" + self.es_task_handler.index_patterns_callable = "path.to.index_pattern_callable" + result = self.es_task_handler._get_index_patterns({}) - importlib.import_module = MagicMock() - importlib.import_module.return_value = MagicMock(**{"mock_callable": mock_callable}) - self.es_task_handler.index_patterns_callable = "module_path.mock_callable" - result = self.es_task_handler._get_index_patterns(ti) - assert result == "mocked_index_patterns" + mock_import_string.assert_called_once_with("path.to.index_pattern_callable") + mock_callable.assert_called_once_with({}) + assert result == "callable_index_pattern" def test_safe_attrgetter():