From 4cf73f2e4866ed363bc0ee4e7c7180d13ab8268c Mon Sep 17 00:00:00 2001 From: aniaan Date: Thu, 16 Sep 2021 20:06:34 +0800 Subject: [PATCH 1/3] feat(query): add time_zone param --- .github/workflows/ci.yml | 3 ++ es/baseapi.py | 11 +++-- es/elastic/api.py | 2 +- es/opendistro/api.py | 2 +- es/tests/test_dbapi.py | 90 ++++++++++++++++++++++++++++++++++------ 5 files changed, 90 insertions(+), 18 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 39cdc7b..6727919 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -83,6 +83,7 @@ jobs: run: | export ES_URI="http://localhost:9200" export ES_PORT=9200 + export ES_SUPPORT_DATETIME_PARSE=False nosetests -v --with-coverage --cover-package=es es.tests - name: Run tests on Elasticsearch 7.10.X run: | @@ -97,6 +98,7 @@ jobs: export ES_PORT=19200 export ES_SCHEME=https export ES_USER=admin + export ES_SUPPORT_DATETIME_PARSE=False nosetests -v --with-coverage --cover-package=es es.tests - name: Run tests on Opendistro 13 run: | @@ -107,6 +109,7 @@ jobs: export ES_SCHEME=https export ES_USER=admin export ES_V2=True + export ES_SUPPORT_DATETIME_PARSE=False nosetests -v --with-coverage --cover-package=es es.tests - name: Upload code coverage run: | diff --git a/es/baseapi.py b/es/baseapi.py index 6cb4daa..a092c5a 100644 --- a/es/baseapi.py +++ b/es/baseapi.py @@ -109,7 +109,7 @@ def get_description_from_columns( class BaseConnection(object): - """Connection to an ES Cluster """ + """Connection to an ES Cluster""" def __init__( self, @@ -192,6 +192,7 @@ def __init__(self, url: str, es: Elasticsearch, **kwargs): self.es = es self.sql_path = kwargs.get("sql_path", DEFAULT_SQL_PATH) self.fetch_size = kwargs.get("fetch_size", DEFAULT_FETCH_SIZE) + self.time_zone: Optional[str] = kwargs.get("time_zone") # This read/write attribute specifies the number of rows to fetch at a # time with .fetchmany(). It defaults to 1 meaning to fetch a single # row at a time. @@ -218,7 +219,7 @@ def custom_sql_to_method_dispatcher(self, command: str) -> Optional["BaseCursor" @check_result @check_closed def rowcount(self) -> int: - """ Counts the number of rows on a result """ + """Counts the number of rows on a result""" if self._results: return len(self._results) return 0 @@ -230,7 +231,7 @@ def close(self) -> None: @check_closed def execute(self, operation, parameters=None) -> "BaseCursor": - """ Children must implement their own custom execute """ + """Children must implement their own custom execute""" raise NotImplementedError # pragma: no cover @check_closed @@ -311,11 +312,13 @@ def elastic_query(self, query: str) -> Dict[str, Any]: payload = {"query": query} if self.fetch_size is not None: payload["fetch_size"] = self.fetch_size + if self.time_zone is not None: + payload["time_zone"] = self.time_zone path = f"/{self.sql_path}/" try: response = self.es.transport.perform_request("POST", path, body=payload) except es_exceptions.ConnectionError: - raise exceptions.OperationalError(f"Error connecting to Elasticsearch") + raise exceptions.OperationalError("Error connecting to Elasticsearch") except es_exceptions.RequestError as ex: raise exceptions.ProgrammingError( f"Error ({ex.error}): {ex.info['error']['reason']}" diff --git a/es/elastic/api.py b/es/elastic/api.py index ed1a4c4..13e41f8 100644 --- a/es/elastic/api.py +++ b/es/elastic/api.py @@ -38,7 +38,7 @@ def connect( class Connection(BaseConnection): - """Connection to an ES Cluster """ + """Connection to an ES Cluster""" def __init__( self, diff --git a/es/opendistro/api.py b/es/opendistro/api.py index b505c82..a27c14b 100644 --- a/es/opendistro/api.py +++ b/es/opendistro/api.py @@ -42,7 +42,7 @@ def connect( class Connection(BaseConnection): - """Connection to an ES Cluster """ + """Connection to an ES Cluster""" def __init__( self, diff --git a/es/tests/test_dbapi.py b/es/tests/test_dbapi.py index e42a306..5968b70 100644 --- a/es/tests/test_dbapi.py +++ b/es/tests/test_dbapi.py @@ -7,28 +7,35 @@ from es.opendistro.api import connect as open_connect +def convert_bool(value: str) -> bool: + return True if value == "True" else False + + class TestDBAPI(unittest.TestCase): def setUp(self): self.driver_name = os.environ.get("ES_DRIVER", "elasticsearch") - host = os.environ.get("ES_HOST", "localhost") - port = int(os.environ.get("ES_PORT", 9200)) - scheme = os.environ.get("ES_SCHEME", "http") - verify_certs = os.environ.get("ES_VERIFY_CERTS", False) - user = os.environ.get("ES_USER", None) - password = os.environ.get("ES_PASSWORD", None) + self.host = os.environ.get("ES_HOST", "localhost") + self.port = int(os.environ.get("ES_PORT", 9200)) + self.scheme = os.environ.get("ES_SCHEME", "http") + self.verify_certs = os.environ.get("ES_VERIFY_CERTS", False) + self.user = os.environ.get("ES_USER", None) + self.password = os.environ.get("ES_PASSWORD", None) self.v2 = bool(os.environ.get("ES_V2", False)) + self.support_datetime_parse = convert_bool( + os.environ.get("ES_SUPPORT_DATETIME_PARSE", "True") + ) if self.driver_name == "elasticsearch": self.connect_func = elastic_connect else: self.connect_func = open_connect self.conn = self.connect_func( - host=host, - port=port, - scheme=scheme, - verify_certs=verify_certs, - user=user, - password=password, + host=self.host, + port=self.port, + scheme=self.scheme, + verify_certs=self.verify_certs, + user=self.user, + password=self.password, v2=self.v2, ) self.cursor = self.conn.cursor() @@ -213,3 +220,62 @@ def test_https(self, mock_elasticsearch): mock_elasticsearch.assert_called_once_with( "https://localhost:9200/", http_auth=("user", "password") ) + + def test_simple_search_with_time_zone(self): + """ + DBAPI: Test simple search with time zone + UTC -> CST + 2019-10-13T00:00:00.000Z => 2019-10-13T08:00:00.000+08:00 + 2019-10-13T00:00:01.000Z => 2019-10-13T08:01:00.000+08:00 + 2019-10-13T00:00:02.000Z => 2019-10-13T08:02:00.000+08:00 + """ + + if not self.support_datetime_parse: + return + + conn = self.connect_func( + host=self.host, + port=self.port, + scheme=self.scheme, + verify_certs=self.verify_certs, + user=self.user, + password=self.password, + v2=self.v2, + time_zone="Asia/Shanghai", + ) + cursor = conn.cursor() + pattern = "yyyy-MM-dd HH:mm:ss" + sql = f""" + SELECT timestamp FROM data1 + WHERE timestamp >= DATETIME_PARSE('2019-10-13 00:08:00', '{pattern}') + """ + + rows = cursor.execute(sql).fetchall() + self.assertEqual(len(rows), 3) + + def test_simple_search_without_time_zone(self): + """ + DBAPI: Test simple search without time zone + """ + + if not self.support_datetime_parse: + return + + conn = self.connect_func( + host=self.host, + port=self.port, + scheme=self.scheme, + verify_certs=self.verify_certs, + user=self.user, + password=self.password, + v2=self.v2, + ) + cursor = conn.cursor() + pattern = "yyyy-MM-dd HH:mm:ss" + sql = f""" + SELECT * FROM data1 + WHERE timestamp >= DATETIME_PARSE('2019-10-13 08:00:00', '{pattern}') + """ + + rows = cursor.execute(sql).fetchall() + self.assertEqual(len(rows), 0) From de193370e87e64ca5c978b8b5d2e0ffb740b7fa4 Mon Sep 17 00:00:00 2001 From: aniaan Date: Mon, 27 Sep 2021 22:46:30 +0800 Subject: [PATCH 2/3] docs(README): update README --- README.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/README.md b/README.md index c1975b4..2d792c1 100644 --- a/README.md +++ b/README.md @@ -122,6 +122,18 @@ If more than 10000 rows should get fetched then [max_result_window](https://www.elastic.co/guide/en/elasticsearch/reference/7.x/index-modules.html#dynamic-index-settings) has to be adapted as well. +#### Time zone + +By default, elasticsearch query time zone defaults to `Z` (UTC). This can be adapted through the `time_zone` +parameter: + +```python +from es.elastic.api import connect + +conn = connect(host='localhost') +curs = conn.cursor(time_zone="Asia/Shanghai") +``` + ### Tests To run unittest launch elasticsearch and kibana (kibana is really not required but is a nice to have) From 1de5c45a47453c2ca1d367dd98f8060f200cb65e Mon Sep 17 00:00:00 2001 From: aniaan Date: Mon, 27 Sep 2021 22:57:01 +0800 Subject: [PATCH 3/3] fix(README): fix code --- README.md | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 2d792c1..ff4a6e2 100644 --- a/README.md +++ b/README.md @@ -6,14 +6,14 @@ [![Coverage Status](https://codecov.io/github/preset-io/elasticsearch-dbapi/coverage.svg?branch=master)](https://codecov.io/github/preset-io/elasticsearch-dbapi) -`elasticsearch-dbapi` Implements a DBAPI (PEP-249) and SQLAlchemy dialect, -that enables SQL access on elasticsearch clusters for query only access. +`elasticsearch-dbapi` Implements a DBAPI (PEP-249) and SQLAlchemy dialect, +that enables SQL access on elasticsearch clusters for query only access. On Elastic Elasticsearch: Uses Elastic X-Pack [SQL API](https://www.elastic.co/guide/en/elasticsearch/reference/current/xpack-sql.html) On AWS ES, opendistro Elasticsearch: -[Open Distro SQL](https://opendistro.github.io/for-elasticsearch-docs/docs/sql/) +[Open Distro SQL](https://opendistro.github.io/for-elasticsearch-docs/docs/sql/) This library supports Elasticsearch 7.X versions. @@ -21,13 +21,13 @@ This library supports Elasticsearch 7.X versions. ```bash $ pip install elasticsearch-dbapi -``` +``` To install support for AWS Elasticsearch Service / [Open Distro](https://opendistro.github.io/for-elasticsearch/features/SQL%20Support.html): ```bash $ pip install elasticsearch-dbapi[opendistro] -``` +``` ### Usage: @@ -92,7 +92,7 @@ print(logs.columns) [elasticsearch-py](https://elasticsearch-py.readthedocs.io/en/master/index.html) is used to establish connections and transport, this is the official elastic python library. `Elasticsearch` constructor accepts multiple optional parameters -that can be used to properly configure your connection on aspects like security, performance +that can be used to properly configure your connection on aspects like security, performance and high availability. These optional parameters can be set at the connection string, for example: @@ -112,12 +112,14 @@ The connection string follows RFC-1738, to support multiple nodes you should use By default the maximum number of rows which get fetched by a single query is limited to 10000. This can be adapted through the `fetch_size` parameter: + ```python from es.elastic.api import connect -conn = connect(host='localhost') -curs = conn.cursor(fetch_size=1000) +conn = connect(host="localhost", fetch_size=1000) +curs = conn.cursor() ``` + If more than 10000 rows should get fetched then [max_result_window](https://www.elastic.co/guide/en/elasticsearch/reference/7.x/index-modules.html#dynamic-index-settings) has to be adapted as well. @@ -130,8 +132,8 @@ parameter: ```python from es.elastic.api import connect -conn = connect(host='localhost') -curs = conn.cursor(time_zone="Asia/Shanghai") +conn = connect(host="localhost", time_zone="Asia/Shanghai") +curs = conn.cursor() ``` ### Tests @@ -145,7 +147,7 @@ $ nosetests -v ### Special case for sql opendistro endpoint (AWS ES) -AWS ES exposes the opendistro SQL plugin, and it follows a different SQL dialect. +AWS ES exposes the opendistro SQL plugin, and it follows a different SQL dialect. Using the `odelasticsearch` driver: ```python @@ -215,7 +217,7 @@ Using the new SQL engine: Opendistro 1.13.0 brings (enabled by default) a new SQL engine, with lots of improvements and fixes. Take a look at the [release notes](https://github.com/opendistro-for-elasticsearch/sql/blob/develop/docs/dev/NewSQLEngine.md) -This DBAPI has to behave slightly different for SQL v1 and SQL v2, by default we comply with v1, +This DBAPI has to behave slightly different for SQL v1 and SQL v2, by default we comply with v1, to enable v2 support, pass `v2=true` has a query parameter. ``` @@ -229,14 +231,14 @@ To connect to the provided Opendistro ES on `docker-compose` use the following U This library does not yet support the following features: -- Array type columns are not supported. Elaticsearch SQL does not support them either. +- Array type columns are not supported. Elaticsearch SQL does not support them either. SQLAlchemy `get_columns` will exclude them. - `object` and `nested` column types are not well supported and are converted to strings - Indexes that whose name start with `.` - GEO points are not currently well-supported and are converted to strings - AWS ES (opendistro elascticsearch) is supported (still beta), known limitations are: - * You are only able to `GROUP BY` keyword fields (new [experimental](https://github.com/opendistro-for-elasticsearch/sql#experimental) + * You are only able to `GROUP BY` keyword fields (new [experimental](https://github.com/opendistro-for-elasticsearch/sql#experimental) opendistro SQL already supports it) - * Indices with dots are not supported (indices like 'audit_log.2021.01.20'), + * Indices with dots are not supported (indices like 'audit_log.2021.01.20'), on these cases we recommend the use of aliases