From f6ea91e3b4386a17f107944f10ff42937454a7b8 Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi Date: Thu, 25 Jul 2024 14:26:46 -0400 Subject: [PATCH 1/3] Clean up unused methods. Prepare for file_system argument. --- src/.pylintrc | 6 +-- src/hipscat/io/__init__.py | 8 +--- src/hipscat/io/file_io/__init__.py | 2 +- src/hipscat/io/file_io/file_io.py | 40 ++++++++++++------- src/hipscat/io/file_io/file_pointer.py | 4 +- src/hipscat/io/write_metadata.py | 16 -------- tests/conftest.py | 6 --- .../catalog/loaders/test_read_from_hipscat.py | 16 ++++++++ tests/hipscat/io/conftest.py | 3 +- tests/hipscat/io/test_write_metadata.py | 27 ------------- .../pixel_math/test_partition_stats.py | 1 - 11 files changed, 52 insertions(+), 77 deletions(-) diff --git a/src/.pylintrc b/src/.pylintrc index 39dac7b5..6ef32b67 100644 --- a/src/.pylintrc +++ b/src/.pylintrc @@ -493,16 +493,16 @@ score=yes ignore-comments=yes # Docstrings are removed from the similarity computation -ignore-docstrings=yes +ignore-docstrings=no # Imports are removed from the similarity computation ignore-imports=yes # Signatures are removed from the similarity computation -ignore-signatures=yes +ignore-signatures=no # Minimum lines number of a similarity. -min-similarity-lines=6 +min-similarity-lines=8 [SPELLING] diff --git a/src/hipscat/io/__init__.py b/src/hipscat/io/__init__.py index 78098bf5..409c1da6 100644 --- a/src/hipscat/io/__init__.py +++ b/src/hipscat/io/__init__.py @@ -4,6 +4,7 @@ from .parquet_metadata import ( read_row_group_fragments, row_group_stat_single_value, + write_parquet_metadata, write_parquet_metadata_for_batches, ) from .paths import ( @@ -18,9 +19,4 @@ pixel_catalog_file, pixel_directory, ) -from .write_metadata import ( - write_catalog_info, - write_parquet_metadata, - write_partition_info, - write_provenance_info, -) +from .write_metadata import write_catalog_info, write_partition_info, write_provenance_info diff --git a/src/hipscat/io/file_io/__init__.py b/src/hipscat/io/file_io/__init__.py index 7a1db5c7..95358487 100644 --- a/src/hipscat/io/file_io/__init__.py +++ b/src/hipscat/io/file_io/__init__.py @@ -1,13 +1,13 @@ from .file_io import ( delete_file, load_csv_to_pandas, + load_csv_to_pandas_generator, load_json_file, load_parquet_to_pandas, load_text_file, make_directory, read_fits_image, read_parquet_dataset, - read_parquet_file, read_parquet_file_to_pandas, read_parquet_metadata, remove_directory, diff --git a/src/hipscat/io/file_io/file_io.py b/src/hipscat/io/file_io/file_io.py index 9f7f9761..11c8211d 100644 --- a/src/hipscat/io/file_io/file_io.py +++ b/src/hipscat/io/file_io/file_io.py @@ -3,7 +3,7 @@ import json import tempfile import warnings -from typing import Any, Dict, Tuple, Union +from typing import Any, Dict, Generator, Tuple, Union import numpy as np import pandas as pd @@ -143,9 +143,32 @@ def load_csv_to_pandas( Returns: pandas dataframe loaded from CSV """ + file_system, file_pointer = get_fs(file_pointer, storage_options=storage_options) + with file_system.open(file_pointer, "r") as csv_file: + frame = pd.read_csv(csv_file, **kwargs) + return frame - pd_storage_option = unnest_headers_for_pandas(storage_options) - return pd.read_csv(file_pointer, storage_options=pd_storage_option, **kwargs) + +def load_csv_to_pandas_generator( + file_pointer: FilePointer, + chunksize=10_000, + *, + storage_options: Union[Dict[Any, Any], None] = None, + **kwargs, +) -> Generator[pd.DataFrame]: + """Load a csv file to a pandas dataframe + Args: + file_pointer: location of csv file to load + file_system: fsspec or pyarrow filesystem, default None + storage_options: dictionary that contains abstract filesystem credentials + **kwargs: arguments to pass to pandas `read_csv` loading method + Returns: + pandas dataframe loaded from CSV + """ + file_system, file_pointer = get_fs(file_pointer, storage_options=storage_options) + with file_system.open(file_pointer, "r") as csv_file: + with pd.read_csv(csv_file, chunksize=chunksize, **kwargs) as reader: + yield from reader def load_parquet_to_pandas( @@ -251,17 +274,6 @@ def read_parquet_dataset( return (source, dataset) -def read_parquet_file(file_pointer: FilePointer, storage_options: Union[Dict[Any, Any], None] = None): - """Read parquet file from file pointer. - - Args: - file_pointer: location of file to read metadata from - storage_options: dictionary that contains abstract filesystem credentials - """ - file_system, file_pointer = get_fs(file_pointer, storage_options=storage_options) - return pq.ParquetFile(file_pointer, filesystem=file_system) - - def write_parquet_metadata( schema: Any, file_pointer: FilePointer, diff --git a/src/hipscat/io/file_io/file_pointer.py b/src/hipscat/io/file_io/file_pointer.py index 59516d59..71136b7d 100644 --- a/src/hipscat/io/file_io/file_pointer.py +++ b/src/hipscat/io/file_io/file_pointer.py @@ -71,8 +71,8 @@ def get_file_pointer_for_fs(protocol: str, file_pointer: FilePointer) -> FilePoi split_pointer = file_pointer.split("file://")[1] else: split_pointer = file_pointer - elif protocol == "https": - # https should include the protocol in the file path + elif "http" in protocol: + # http/https should include the protocol in the file path split_pointer = file_pointer # don't split else: split_pointer = file_pointer.split(f"{protocol}://")[1] diff --git a/src/hipscat/io/write_metadata.py b/src/hipscat/io/write_metadata.py index 2afda08d..7333ac2e 100644 --- a/src/hipscat/io/write_metadata.py +++ b/src/hipscat/io/write_metadata.py @@ -11,7 +11,6 @@ import pandas as pd from hipscat.io import file_io, paths -from hipscat.io.parquet_metadata import write_parquet_metadata as wpm from hipscat.pixel_math.healpix_pixel import HealpixPixel @@ -127,21 +126,6 @@ def write_partition_info( ) -def write_parquet_metadata(catalog_path, storage_options: Union[Dict[Any, Any], None] = None): - """Generate parquet metadata, using the already-partitioned parquet files - for this catalog - - Args: - catalog_path (str): base path for the catalog - storage_options: dictionary that contains abstract filesystem credentials - """ - wpm( - catalog_path=catalog_path, - storage_options=storage_options, - output_path=catalog_path, - ) - - def write_fits_map(catalog_path, histogram: np.ndarray, storage_options: Union[Dict[Any, Any], None] = None): """Write the object spatial distribution information to a healpix FITS file. diff --git a/tests/conftest.py b/tests/conftest.py index b33b8714..e7e44c21 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -19,7 +19,6 @@ ALMANAC_DIR_NAME = "almanac" SMALL_SKY_DIR_NAME = "small_sky" SMALL_SKY_ORDER1_DIR_NAME = "small_sky_order1" -SMALL_SKY_TO_SMALL_SKY_ORDER1_DIR_NAME = "small_sky_to_small_sky_order1" SMALL_SKY_SOURCE_OBJECT_INDEX_DIR_NAME = "small_sky_source_object_index" TEST_DIR = os.path.dirname(__file__) @@ -47,11 +46,6 @@ def small_sky_order1_dir(test_data_dir): return test_data_dir / SMALL_SKY_ORDER1_DIR_NAME -@pytest.fixture -def small_sky_to_small_sky_order1_dir(test_data_dir): - return test_data_dir / SMALL_SKY_TO_SMALL_SKY_ORDER1_DIR_NAME - - @pytest.fixture def small_sky_source_object_index_dir(test_data_dir): return test_data_dir / SMALL_SKY_SOURCE_OBJECT_INDEX_DIR_NAME diff --git a/tests/hipscat/catalog/loaders/test_read_from_hipscat.py b/tests/hipscat/catalog/loaders/test_read_from_hipscat.py index f19da5cb..67ed2f32 100644 --- a/tests/hipscat/catalog/loaders/test_read_from_hipscat.py +++ b/tests/hipscat/catalog/loaders/test_read_from_hipscat.py @@ -9,3 +9,19 @@ def test_read_from_hipscat_wrong_catalog_type(small_sky_dir): read_from_hipscat(small_sky_dir, catalog_type=CatalogType.ASSOCIATION) with pytest.raises(NotImplementedError, match="load catalog of type"): read_from_hipscat(small_sky_dir, catalog_type="unknown") + + +def test_read_hipscat_branches( + small_sky_dir, + small_sky_order1_dir, + association_catalog_path, + small_sky_source_object_index_dir, + margin_catalog_path, + small_sky_source_dir, +): + read_from_hipscat(small_sky_dir) + read_from_hipscat(small_sky_order1_dir) + read_from_hipscat(association_catalog_path) + read_from_hipscat(small_sky_source_object_index_dir) + read_from_hipscat(margin_catalog_path) + read_from_hipscat(small_sky_source_dir) diff --git a/tests/hipscat/io/conftest.py b/tests/hipscat/io/conftest.py index f0fd09b1..d635536a 100644 --- a/tests/hipscat/io/conftest.py +++ b/tests/hipscat/io/conftest.py @@ -3,6 +3,7 @@ import re import numpy.testing as npt +import pyarrow.parquet as pq import pytest from hipscat.io import file_io @@ -64,7 +65,7 @@ def check_parquet_schema(file_name, expected_schema, expected_num_row_groups=1): assert schema.equals(expected_schema, check_metadata=False) - parquet_file = file_io.read_parquet_file(file_name) + parquet_file = pq.ParquetFile(file_name) assert parquet_file.metadata.num_row_groups == expected_num_row_groups for row_index in range(0, parquet_file.metadata.num_row_groups): diff --git a/tests/hipscat/io/test_write_metadata.py b/tests/hipscat/io/test_write_metadata.py index 8df9bc96..a8d90848 100644 --- a/tests/hipscat/io/test_write_metadata.py +++ b/tests/hipscat/io/test_write_metadata.py @@ -1,6 +1,5 @@ """Tests of file IO (reads and writes)""" -import shutil from pathlib import Path import numpy as np @@ -178,32 +177,6 @@ def test_write_partition_info_float(assert_text_file_matches, tmp_path): assert_text_file_matches(expected_lines, metadata_filename) -def test_write_parquet_metadata(tmp_path, small_sky_dir, small_sky_schema, check_parquet_schema): - """Copy existing catalog and create new metadata files for it""" - catalog_base_dir = tmp_path / "catalog" - shutil.copytree( - small_sky_dir, - catalog_base_dir, - ) - io.write_parquet_metadata(catalog_base_dir) - check_parquet_schema(catalog_base_dir / "_metadata", small_sky_schema) - ## _common_metadata has 0 row groups - check_parquet_schema( - catalog_base_dir / "_common_metadata", - small_sky_schema, - 0, - ) - ## Re-write - should still have the same properties. - io.write_parquet_metadata(catalog_base_dir) - check_parquet_schema(catalog_base_dir / "_metadata", small_sky_schema) - ## _common_metadata has 0 row groups - check_parquet_schema( - catalog_base_dir / "_common_metadata", - small_sky_schema, - 0, - ) - - def test_read_write_fits_point_map(tmp_path): """Check that we write and can read a FITS file for spatial distribution.""" initial_histogram = hist.empty_histogram(1) diff --git a/tests/hipscat/pixel_math/test_partition_stats.py b/tests/hipscat/pixel_math/test_partition_stats.py index f34b3a28..bc995936 100644 --- a/tests/hipscat/pixel_math/test_partition_stats.py +++ b/tests/hipscat/pixel_math/test_partition_stats.py @@ -169,7 +169,6 @@ def test_alignment_small_sky_order2(drop_empty_siblings): (0, 11, 131), ] expected[176:192] = tuples - print(result[176:192]) npt.assert_array_equal(result, expected) From 33efe8887ef9cd8ca01326db5ec262a4fe825fde Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi Date: Fri, 26 Jul 2024 07:59:42 -0400 Subject: [PATCH 2/3] Increase test coverage --- tests/hipscat/io/file_io/test_file_io.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/tests/hipscat/io/file_io/test_file_io.py b/tests/hipscat/io/file_io/test_file_io.py index 362ba95d..038b8e34 100644 --- a/tests/hipscat/io/file_io/test_file_io.py +++ b/tests/hipscat/io/file_io/test_file_io.py @@ -7,6 +7,8 @@ from hipscat.io.file_io import ( delete_file, get_file_pointer_from_path, + load_csv_to_pandas, + load_csv_to_pandas_generator, load_json_file, load_parquet_to_pandas, make_directory, @@ -115,6 +117,21 @@ def test_load_json(small_sky_dir): assert loaded_json_dict == json_dict +def test_load_csv_to_pandas(small_sky_source_dir): + partition_info_path = small_sky_source_dir / "partition_info.csv" + frame = load_csv_to_pandas(partition_info_path) + assert len(frame) == 14 + + +def test_load_csv_to_pandas_generator(small_sky_source_dir): + partition_info_path = small_sky_source_dir / "partition_info.csv" + num_reads = 0 + for frame in load_csv_to_pandas_generator(partition_info_path, chunksize=7): + assert len(frame) == 7 + num_reads += 1 + assert num_reads == 2 + + def test_load_parquet_to_pandas(small_sky_dir): pixel_data_path = pixel_catalog_file(small_sky_dir, 0, 11) parquet_df = pd.read_parquet(pixel_data_path) From e37ba01bb89e164b4d79ba91f2b4344d149f653e Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi <113376043+delucchi-cmu@users.noreply.github.com> Date: Fri, 26 Jul 2024 12:48:37 -0400 Subject: [PATCH 3/3] Update src/hipscat/io/file_io/file_pointer.py Co-authored-by: Konstantin Malanchev --- src/hipscat/io/file_io/file_pointer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/hipscat/io/file_io/file_pointer.py b/src/hipscat/io/file_io/file_pointer.py index 71136b7d..bbaa2e1f 100644 --- a/src/hipscat/io/file_io/file_pointer.py +++ b/src/hipscat/io/file_io/file_pointer.py @@ -71,7 +71,7 @@ def get_file_pointer_for_fs(protocol: str, file_pointer: FilePointer) -> FilePoi split_pointer = file_pointer.split("file://")[1] else: split_pointer = file_pointer - elif "http" in protocol: + elif protocol.startswith("http"): # http/https should include the protocol in the file path split_pointer = file_pointer # don't split else: