diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 04bf3428..c0ca74e2 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -43,12 +43,9 @@ Run the following command in your terminal to create a virtual environment in th tox --devenv .venv -e {environment-name} ``` The `—devenv` flag tells `tox` to create a development environment, and `.venv` is the folder where the virtual environment will be created. -Pre-defined environments can be found within the `tox.ini` file for different Python versions and their corresponding PySpark version. They include: -- py37-pyspark300 -- py38-pyspark312 -- py38-pyspark321 -- py39-pyspark330 -- py39-pyspark332 + +## Environments we test +The environments we test against are defined within the `tox.ini` file, and the requirements for those environments are stored in `python/tests/requirements`. The makeup of these environments is inspired by the [Databricks Runtime](https://docs.databricks.com/en/release-notes/runtime/index.html#) (hence the naming convention), but it's important to note that developing Databricks is **not** a requirement. We're simply mimicking some of the different runtime versions because (a) we recognize that much of the user base uses `tempo` on Databricks and (b) it saves development time spent trying to build out test environments with different versions of Python and PySpark from scratch. ## Run tests locally for one or more environments You can run tests locally for one or more environments defined enviornments without setting up a development environment first. diff --git a/docs/requirements.txt b/docs/requirements.txt new file mode 100644 index 00000000..7a76c34a --- /dev/null +++ b/docs/requirements.txt @@ -0,0 +1,6 @@ +sphinx-autobuild==2021.3.14 +sphinx-copybutton==0.5.1 +Sphinx==4.5.0 +sphinx-design==0.2.0 +sphinx-panels==0.6.0 +furo==2022.9.29 \ No newline at end of file diff --git a/python/requirements.txt b/python/requirements.txt deleted file mode 100644 index 1a6844a9..00000000 --- a/python/requirements.txt +++ /dev/null @@ -1,19 +0,0 @@ -ipython==8.10.0 -numpy==1.24.3 -chispa==0.9.2 -pandas==1.5.2 -pyarrow==12.0.0 -python-dateutil==2.8.2 -pytz==2022.7.1 -scipy==1.10.1 -six==1.16.0 -wheel==0.38.4 -semver==2.13.0 -sphinx-autobuild==2021.3.14 -furo==2022.9.29 -sphinx-copybutton==0.5.1 -Sphinx==4.5.0 -sphinx-design==0.2.0 -sphinx-panels==0.6.0 -jsonref==1.1.0 -python-dateutil==2.8.2 diff --git a/python/tempo/io.py b/python/tempo/io.py index f3466ef5..22fe4d8e 100644 --- a/python/tempo/io.py +++ b/python/tempo/io.py @@ -1,16 +1,14 @@ from __future__ import annotations import logging -import os from collections import deque from typing import Optional import pyspark.sql.functions as sfn +import tempo.tsdf as t_tsdf from pyspark.sql import SparkSession from pyspark.sql.utils import ParseException -import tempo.tsdf as t_tsdf - logger = logging.getLogger(__name__) @@ -31,12 +29,6 @@ def write( df = tsdf.df ts_col = tsdf.ts_col partitionCols = tsdf.partitionCols - if optimizationCols: - optimizationCols = optimizationCols + ["event_time"] - else: - optimizationCols = ["event_time"] - - useDeltaOpt = os.getenv("DATABRICKS_RUNTIME_VERSION") is not None view_df = df.withColumn("event_dt", sfn.to_date(sfn.col(ts_col))).withColumn( "event_time", @@ -52,11 +44,12 @@ def write( tabName ) - if useDeltaOpt: + if optimizationCols: try: spark.sql( "optimize {} zorder by {}".format( - tabName, "(" + ",".join(partitionCols + optimizationCols) + ")" + tabName, + "(" + ",".join(partitionCols + optimizationCols + [ts_col]) + ")", ) ) except ParseException as e: @@ -65,8 +58,3 @@ def write( e ) ) - else: - logger.warning( - "Delta optimizations attempted on a non-Databricks platform. " - "Switch to use Databricks Runtime to get optimization advantages." - ) diff --git a/python/tempo/tsdf.py b/python/tempo/tsdf.py index bbec6b78..29e517d2 100644 --- a/python/tempo/tsdf.py +++ b/python/tempo/tsdf.py @@ -13,6 +13,7 @@ from pyspark.sql import SparkSession from pyspark.sql.column import Column from pyspark.sql.dataframe import DataFrame +from pyspark.sql.types import TimestampType from pyspark.sql.window import Window, WindowSpec from scipy.fft import fft, fftfreq # type: ignore @@ -1102,7 +1103,7 @@ def withRangeStats( ] # build window - if str(self.df.schema[self.ts_col].dataType) == "TimestampType": + if isinstance(self.df.schema[self.ts_col].dataType, TimestampType): self.df = self.__add_double_ts() prohibited_cols.extend(["double_ts"]) w = self.__rangeBetweenWindow( diff --git a/python/tempo/utils.py b/python/tempo/utils.py index d539da1b..4a10ebfb 100644 --- a/python/tempo/utils.py +++ b/python/tempo/utils.py @@ -5,17 +5,15 @@ import warnings from typing import List, Optional, Union, overload +import pyspark.sql.functions as sfn +import tempo.resample as t_resample +import tempo.tsdf as t_tsdf from IPython import get_ipython from IPython.core.display import HTML from IPython.display import display as ipydisplay from pandas.core.frame import DataFrame as pandasDataFrame - -import pyspark.sql.functions as sfn from pyspark.sql.dataframe import DataFrame -import tempo.resample as t_resample -import tempo.tsdf as t_tsdf - logger = logging.getLogger(__name__) IS_DATABRICKS = "DB_HOME" in os.environ.keys() diff --git a/python/tests/base.py b/python/tests/base.py index 7da859c8..cdba2845 100644 --- a/python/tests/base.py +++ b/python/tests/base.py @@ -5,12 +5,11 @@ from typing import Union import jsonref -from chispa import assert_df_equality - import pyspark.sql.functions as sfn +from chispa import assert_df_equality +from delta.pip_utils import configure_spark_with_delta_pip from pyspark.sql import SparkSession from pyspark.sql.dataframe import DataFrame - from tempo.intervals import IntervalsDF from tempo.tsdf import TSDF @@ -28,9 +27,11 @@ class SparkTest(unittest.TestCase): def setUpClass(cls) -> None: # create and configure PySpark Session cls.spark = ( - SparkSession.builder.appName("unit-tests") - .config("spark.jars.packages", "io.delta:delta-core_2.12:1.1.0") - .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + configure_spark_with_delta_pip(SparkSession.builder.appName("unit-tests")) + .config( + "spark.sql.extensions", + "io.delta.sql.DeltaSparkSessionExtension", + ) .config( "spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog", @@ -124,7 +125,7 @@ def __loadTestData(self, test_case_path: str) -> dict: :param test_case_path: string representation of the data path e.g. : "tsdf_tests.BasicTests.test_describe" :type test_case_path: str """ - file_name, class_name, func_name = test_case_path.split(".") + file_name, class_name, func_name = test_case_path.split(".")[-3:] # find our test data file test_data_file = self.__getTestDataFilePath(file_name) @@ -225,4 +226,5 @@ def assertDataFrameEquality( ignore_row_order=ignore_row_order, ignore_column_order=ignore_column_order, ignore_nullable=ignore_nullable, + ignore_metadata=True, ) diff --git a/python/tests/io_tests.py b/python/tests/io_tests.py index 44b837e3..7a138218 100644 --- a/python/tests/io_tests.py +++ b/python/tests/io_tests.py @@ -1,10 +1,12 @@ import logging -import os import unittest -from unittest import mock +from importlib.metadata import version +from packaging import version as pkg_version from tests.base import SparkTest +DELTA_VERSION = version("delta-spark") + class DeltaWriteTest(SparkTest): def test_write_to_delta_without_optimization_cols(self): @@ -37,29 +39,6 @@ def test_write_to_delta_with_optimization_cols(self): # should be equal to the expected dataframe self.assertEqual(self.spark.table(table_name).count(), 7) - def test_write_to_delta_non_dbr_environment_logging(self): - """Test logging when writing""" - - table_name = "my_table_optimization_col" - - # load test data - input_tsdf = self.get_data_as_tsdf("input_data") - - with self.assertLogs(level="WARNING") as warning_captured: - # test write to delta - input_tsdf.write(self.spark, table_name, ["date"]) - - self.assertEqual(len(warning_captured.records), 1) - self.assertEqual( - warning_captured.output, - [ - "WARNING:tempo.io:" - "Delta optimizations attempted on a non-Databricks platform. " - "Switch to use Databricks Runtime to get optimization advantages." - ], - ) - - @mock.patch.dict(os.environ, {"DATABRICKS_RUNTIME_VERSION": "10.4"}) def test_write_to_delta_bad_dbr_environment_logging(self): """Test useDeltaOpt Exception""" @@ -68,25 +47,29 @@ def test_write_to_delta_bad_dbr_environment_logging(self): # load test data input_tsdf = self.get_data_as_tsdf("input_data") - with self.assertLogs(level="ERROR") as error_captured: - # test write to delta - input_tsdf.write(self.spark, table_name, ["date"]) - - self.assertEqual(len(error_captured.records), 1) - print(error_captured.output) - self.assertEqual( - error_captured.output, - [ - "ERROR:tempo.io:" - "Delta optimizations attempted, but was not successful.\nError: \nmismatched input " - "'optimize' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', " - "'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', " - "'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', " - "'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', " - "'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0)\n\n== SQL ==\noptimize " - "my_table_optimization_col_fails zorder by (symbol,date,event_time)\n^^^\n" - ], - ) + if pkg_version.parse(DELTA_VERSION) < pkg_version.parse("2.0.0"): + + with self.assertLogs(level="ERROR") as error_captured: + # should fail to run optimize + input_tsdf.write(self.spark, table_name, ["date"]) + + self.assertEqual(len(error_captured.records), 1) + print(error_captured.output) + self.assertEqual( + error_captured.output, + [ + "ERROR:tempo.io:" + "Delta optimizations attempted, but was not successful.\nError: \nmismatched input " + "'optimize' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', " + "'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', " + "'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', " + "'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', " + "'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0)\n\n== SQL ==\noptimize " + "my_table_optimization_col_fails zorder by (symbol,date,event_time)\n^^^\n" + ], + ) + else: + pass # MAIN diff --git a/python/tests/requirements/dbr104.txt b/python/tests/requirements/dbr104.txt new file mode 100644 index 00000000..4e2284cf --- /dev/null +++ b/python/tests/requirements/dbr104.txt @@ -0,0 +1,7 @@ +delta-spark==1.1.0 +ipython==7.22.0 +numpy==1.20.1 +pandas==1.2.4 +pyarrow==4.0.0 +pyspark==3.2.1 +scipy==1.6.2 \ No newline at end of file diff --git a/python/tests/requirements/dbr113.txt b/python/tests/requirements/dbr113.txt new file mode 100644 index 00000000..a2fe6b88 --- /dev/null +++ b/python/tests/requirements/dbr113.txt @@ -0,0 +1,7 @@ +delta-spark==2.1.0 +ipython==7.32.0 +numpy==1.20.3 +pandas==1.3.4 +pyarrow==7.0.0 +pyspark==3.3.0 +scipy==1.7.1 \ No newline at end of file diff --git a/python/tests/requirements/dbr122.txt b/python/tests/requirements/dbr122.txt new file mode 100644 index 00000000..d5f44af9 --- /dev/null +++ b/python/tests/requirements/dbr122.txt @@ -0,0 +1,7 @@ +delta-spark==2.2.0 +ipython==8.5.0 +numpy==1.21.5 +pandas==1.4.2 +pyarrow==7.0.0 +pyspark==3.3.2 +scipy==1.7.3 \ No newline at end of file diff --git a/python/tests/requirements/dbr133.txt b/python/tests/requirements/dbr133.txt new file mode 100644 index 00000000..633a452c --- /dev/null +++ b/python/tests/requirements/dbr133.txt @@ -0,0 +1,7 @@ +delta-spark==2.4.0 +ipython==8.10.0 +numpy==1.21.5 +pandas==1.4.4 +pyarrow==8.0.0 +pyspark==3.4.1 +scipy==1.9.1 \ No newline at end of file diff --git a/python/tests/requirements/dbr142.txt b/python/tests/requirements/dbr142.txt new file mode 100644 index 00000000..b4ad90dd --- /dev/null +++ b/python/tests/requirements/dbr142.txt @@ -0,0 +1,7 @@ +delta-spark==3.0.0 +ipython==8.14.0 +numpy==1.23.5 +pandas==1.5.3 +pyarrow==8.0.0 +pyspark==3.5.0 +scipy==1.10.0 \ No newline at end of file diff --git a/python/tests/requirements/dbr91.txt b/python/tests/requirements/dbr91.txt new file mode 100644 index 00000000..faf44bb8 --- /dev/null +++ b/python/tests/requirements/dbr91.txt @@ -0,0 +1,7 @@ +delta-spark==1.0.0 +ipython==7.22.0 +numpy==1.19.2 +pandas==1.2.4 +pyarrow==4.0.0 +pyspark==3.1.2 +scipy==1.6.2 \ No newline at end of file diff --git a/python/tests/requirements/dev.txt b/python/tests/requirements/dev.txt new file mode 100644 index 00000000..c8090248 --- /dev/null +++ b/python/tests/requirements/dev.txt @@ -0,0 +1,4 @@ +chispa +jsonref +packaging +python-dateutil \ No newline at end of file diff --git a/python/tests/tsdf_tests.py b/python/tests/tsdf_tests.py index c36263e4..33af3155 100644 --- a/python/tests/tsdf_tests.py +++ b/python/tests/tsdf_tests.py @@ -876,8 +876,7 @@ def test_withPartitionCols(self): self.assertEqual(init_tsdf.partitionCols, []) self.assertEqual(actual_tsdf.partitionCols, ["symbol"]) - def test_tsdf_interpolate(self): - ... + def test_tsdf_interpolate(self): ... class FourierTransformTest(SparkTest): diff --git a/python/tox.ini b/python/tox.ini index d6af2f91..f5283081 100644 --- a/python/tox.ini +++ b/python/tox.ini @@ -11,9 +11,7 @@ envlist = build-dist ; Mirror Supported LTS DBR versions here: https://docs.databricks.com/release-notes/runtime/ ; Use correct PySpark version based on Python version present in env name - py37-pyspark300, - py38-pyspark{312,321}, - py39-pyspark{330,332} + dbr{91,104,113,122,133,142} skip_missing_interpreters = true @@ -23,14 +21,19 @@ package = wheel wheel_build_env = .pkg setenv = COVERAGE_FILE = .coverage.{envname} +basepython = + dbr142: py310 + dbr133: py310 + dbr122: py39 + dbr113: py39 + dbr104: py38 + dbr91: py38 + dbr73: py37 deps = - pyspark300: pyspark==3.0.0 - pyspark312: pyspark==3.1.2 - pyspark321: pyspark==3.2.1 - pyspark330: pyspark==3.3.0 - pyspark332: pyspark==3.3.2 + -rtests/requirements/{envname}.txt + -rtests/requirements/dev.txt coverage>=7,<8 - -rrequirements.txt + commands = coverage --version coverage run -m unittest discover -s tests -p '*_tests.py' @@ -63,7 +66,7 @@ deps = mypy>=1,<2 pandas-stubs>=2,<3 types-pytz>=2023,<2024 - -rrequirements.txt + -rtests/requirements/dbr133.txt commands = mypy {toxinidir}/tempo