From 3f21799076d72d85c724eed6d984f873a507b1e9 Mon Sep 17 00:00:00 2001 From: Sandro Campos Date: Mon, 22 Jul 2024 14:57:34 -0400 Subject: [PATCH 1/3] Initialize catalog with arrow schema --- .../association_catalog.py | 18 ++++++++++-------- src/hipscat/catalog/catalog.py | 8 +++++--- .../healpix_dataset/healpix_dataset.py | 19 +++++++++++++++++-- .../catalog/margin_cache/margin_catalog.py | 7 +++++-- 4 files changed, 37 insertions(+), 15 deletions(-) diff --git a/src/hipscat/catalog/association_catalog/association_catalog.py b/src/hipscat/catalog/association_catalog/association_catalog.py index b74fc135..572a686c 100644 --- a/src/hipscat/catalog/association_catalog/association_catalog.py +++ b/src/hipscat/catalog/association_catalog/association_catalog.py @@ -11,6 +11,7 @@ from hipscat.catalog.catalog_type import CatalogType from hipscat.catalog.healpix_dataset.healpix_dataset import HealpixDataset, PixelInputTypes from hipscat.io import FilePointer, file_io, paths +import pyarrow as pa class AssociationCatalog(HealpixDataset): @@ -29,17 +30,18 @@ class AssociationCatalog(HealpixDataset): JoinPixelInputTypes = Union[list, pd.DataFrame, PartitionJoinInfo] def __init__( - self, - catalog_info: CatalogInfoClass, - pixels: PixelInputTypes, - join_pixels: JoinPixelInputTypes, - catalog_path=None, - moc: MOC | None = None, - storage_options: Union[Dict[Any, Any], None] = None, + self, + catalog_info: CatalogInfoClass, + pixels: PixelInputTypes, + join_pixels: JoinPixelInputTypes, + catalog_path=None, + moc: MOC | None = None, + schema: pa.Schema | None = None, + storage_options: Union[Dict[Any, Any], None] = None, ) -> None: if not catalog_info.catalog_type == CatalogType.ASSOCIATION: raise ValueError("Catalog info `catalog_type` must be 'association'") - super().__init__(catalog_info, pixels, catalog_path, moc=moc, storage_options=storage_options) + super().__init__(catalog_info, pixels, catalog_path, moc=moc, schema=schema, storage_options=storage_options) self.join_info = self._get_partition_join_info_from_pixels(join_pixels) def get_join_pixels(self) -> pd.DataFrame: diff --git a/src/hipscat/catalog/catalog.py b/src/hipscat/catalog/catalog.py index b911184b..13b63ed3 100644 --- a/src/hipscat/catalog/catalog.py +++ b/src/hipscat/catalog/catalog.py @@ -23,7 +23,7 @@ validate_radius, ) from hipscat.pixel_tree.negative_tree import compute_negative_tree_pixels - +import pyarrow as pa class Catalog(HealpixDataset): """A HiPSCat Catalog with data stored in a HEALPix Hive partitioned structure @@ -46,6 +46,7 @@ def __init__( pixels: PixelInputTypes, catalog_path: str = None, moc: MOC | None = None, + schema: pa.Schema | None = None, storage_options: Union[Dict[Any, Any], None] = None, ) -> None: """Initializes a Catalog @@ -56,8 +57,9 @@ def __init__( list of HealpixPixel, `PartitionInfo object`, or a `PixelTree` object catalog_path: If the catalog is stored on disk, specify the location of the catalog Does not load the catalog from this path, only store as metadata - storage_options: dictionary that contains abstract filesystem credentials moc (mocpy.MOC): MOC object representing the coverage of the catalog + schema (pa.Schema): The pyarrow schema for the catalog + storage_options: dictionary that contains abstract filesystem credentials """ if catalog_info.catalog_type not in self.HIPS_CATALOG_TYPES: raise ValueError( @@ -65,7 +67,7 @@ def __init__( f"{', '.join([t.value for t in self.HIPS_CATALOG_TYPES])}" ) super().__init__( - catalog_info, pixels, catalog_path=catalog_path, moc=moc, storage_options=storage_options + catalog_info, pixels, catalog_path=catalog_path, moc=moc, schema=schema, storage_options=storage_options ) def filter_by_cone(self, ra: float, dec: float, radius_arcsec: float) -> Catalog: diff --git a/src/hipscat/catalog/healpix_dataset/healpix_dataset.py b/src/hipscat/catalog/healpix_dataset/healpix_dataset.py index 572da159..ca32cd83 100644 --- a/src/hipscat/catalog/healpix_dataset/healpix_dataset.py +++ b/src/hipscat/catalog/healpix_dataset/healpix_dataset.py @@ -12,11 +12,13 @@ from hipscat.catalog.dataset import BaseCatalogInfo, Dataset from hipscat.catalog.partition_info import PartitionInfo from hipscat.io import FilePointer, file_io, paths +from hipscat.io.file_io import read_parquet_metadata from hipscat.pixel_math import HealpixPixel from hipscat.pixel_tree import PixelAlignment, PixelAlignmentType from hipscat.pixel_tree.moc_filter import filter_by_moc from hipscat.pixel_tree.pixel_alignment import align_with_mocs from hipscat.pixel_tree.pixel_tree import PixelTree +import pyarrow as pa PixelInputTypes = Union[PartitionInfo, PixelTree, List[HealpixPixel]] @@ -39,6 +41,7 @@ def __init__( pixels: PixelInputTypes, catalog_path: str = None, moc: MOC | None = None, + schema: pa.Schema | None = None, storage_options: Union[Dict[Any, Any], None] = None, ) -> None: """Initializes a Catalog @@ -49,13 +52,15 @@ def __init__( list of HealpixPixel, `PartitionInfo object`, or a `PixelTree` object catalog_path: If the catalog is stored on disk, specify the location of the catalog Does not load the catalog from this path, only store as metadata - storage_options: dictionary that contains abstract filesystem credentials moc (mocpy.MOC): MOC object representing the coverage of the catalog + schema (pa.Schema): The pyarrow schema for the catalog + storage_options: dictionary that contains abstract filesystem credentials """ super().__init__(catalog_info, catalog_path=catalog_path, storage_options=storage_options) self.partition_info = self._get_partition_info_from_pixels(pixels) self.pixel_tree = self._get_pixel_tree_from_pixels(pixels) self.moc = moc + self.schema = schema def get_healpix_pixels(self) -> List[HealpixPixel]: """Get healpix pixel objects for all pixels contained in the catalog. @@ -101,6 +106,7 @@ def _read_kwargs( ) -> dict: kwargs = super()._read_kwargs(catalog_base_dir, storage_options=storage_options) kwargs["moc"] = cls._read_moc_from_point_map(catalog_base_dir, storage_options) + kwargs["schema"] = cls._read_schema_from_metadata(catalog_base_dir, storage_options) return kwargs @classmethod @@ -118,6 +124,15 @@ def _read_moc_from_point_map( orders = np.full(ipix.shape, order) return MOC.from_healpix_cells(ipix, orders, order) + @classmethod + def _read_schema_from_metadata(cls, catalog_base_dir: FilePointer, storage_options: dict | None = None) -> pa.Schema | None: + """Reads the schema information stored in the _metadata file""" + common_metadata_file = paths.get_common_metadata_pointer(catalog_base_dir) + if file_io.does_file_or_directory_exist(common_metadata_file, storage_options=storage_options): + metadata = read_parquet_metadata(common_metadata_file, storage_options=storage_options) + return metadata.schema.to_arrow_schema() + return None + @classmethod def _check_files_exist(cls, catalog_base_dir: FilePointer, storage_options: dict = None): super()._check_files_exist(catalog_base_dir, storage_options=storage_options) @@ -170,7 +185,7 @@ def filter_by_moc(self, moc: MOC) -> Self: filtered_tree = filter_by_moc(self.pixel_tree, moc) filtered_moc = self.moc.intersection(moc) if self.moc is not None else None filtered_catalog_info = dataclasses.replace(self.catalog_info, total_rows=None) - return self.__class__(filtered_catalog_info, filtered_tree, moc=filtered_moc) + return self.__class__(filtered_catalog_info, filtered_tree, moc=filtered_moc, schema=self.schema) def align( self, other_cat: Self, alignment_type: PixelAlignmentType = PixelAlignmentType.INNER diff --git a/src/hipscat/catalog/margin_cache/margin_catalog.py b/src/hipscat/catalog/margin_cache/margin_catalog.py index 53bd174a..bdafe4b0 100644 --- a/src/hipscat/catalog/margin_cache/margin_catalog.py +++ b/src/hipscat/catalog/margin_cache/margin_catalog.py @@ -8,6 +8,7 @@ from hipscat.catalog.healpix_dataset.healpix_dataset import HealpixDataset, PixelInputTypes from hipscat.catalog.margin_cache import MarginCacheCatalogInfo from hipscat.pixel_tree.moc_utils import copy_moc +import pyarrow as pa class MarginCatalog(HealpixDataset): @@ -30,6 +31,7 @@ def __init__( pixels: PixelInputTypes, catalog_path: str = None, moc: MOC | None = None, + schema: pa.Schema | None = None, storage_options: dict | None = None, ) -> None: """Initializes a Margin Catalog @@ -40,13 +42,14 @@ def __init__( list of HealpixPixel, `PartitionInfo object`, or a `PixelTree` object catalog_path: If the catalog is stored on disk, specify the location of the catalog Does not load the catalog from this path, only store as metadata - storage_options: dictionary that contains abstract filesystem credentials moc (mocpy.MOC): MOC object representing the coverage of the catalog + schema (pa.Schema): The pyarrow schema for the catalog + storage_options: dictionary that contains abstract filesystem credentials """ if catalog_info.catalog_type != CatalogType.MARGIN: raise ValueError(f"Catalog info `catalog_type` must equal {CatalogType.MARGIN}") super().__init__( - catalog_info, pixels, catalog_path=catalog_path, moc=moc, storage_options=storage_options + catalog_info, pixels, catalog_path=catalog_path, moc=moc, schema=schema, storage_options=storage_options ) def filter_by_moc(self, moc: MOC) -> Self: From 350c9167d1712ad429fbe560d0fee499fcdb1609 Mon Sep 17 00:00:00 2001 From: Sandro Campos Date: Mon, 22 Jul 2024 14:59:03 -0400 Subject: [PATCH 2/3] Apply formatting fixes --- .../association_catalog.py | 22 ++++++++++--------- src/hipscat/catalog/catalog.py | 10 +++++++-- .../healpix_dataset/healpix_dataset.py | 6 +++-- .../catalog/margin_cache/margin_catalog.py | 9 ++++++-- 4 files changed, 31 insertions(+), 16 deletions(-) diff --git a/src/hipscat/catalog/association_catalog/association_catalog.py b/src/hipscat/catalog/association_catalog/association_catalog.py index 572a686c..012c8f4f 100644 --- a/src/hipscat/catalog/association_catalog/association_catalog.py +++ b/src/hipscat/catalog/association_catalog/association_catalog.py @@ -3,6 +3,7 @@ from typing import Any, Dict, Tuple, Union import pandas as pd +import pyarrow as pa from mocpy import MOC from typing_extensions import TypeAlias @@ -11,7 +12,6 @@ from hipscat.catalog.catalog_type import CatalogType from hipscat.catalog.healpix_dataset.healpix_dataset import HealpixDataset, PixelInputTypes from hipscat.io import FilePointer, file_io, paths -import pyarrow as pa class AssociationCatalog(HealpixDataset): @@ -30,18 +30,20 @@ class AssociationCatalog(HealpixDataset): JoinPixelInputTypes = Union[list, pd.DataFrame, PartitionJoinInfo] def __init__( - self, - catalog_info: CatalogInfoClass, - pixels: PixelInputTypes, - join_pixels: JoinPixelInputTypes, - catalog_path=None, - moc: MOC | None = None, - schema: pa.Schema | None = None, - storage_options: Union[Dict[Any, Any], None] = None, + self, + catalog_info: CatalogInfoClass, + pixels: PixelInputTypes, + join_pixels: JoinPixelInputTypes, + catalog_path=None, + moc: MOC | None = None, + schema: pa.Schema | None = None, + storage_options: Union[Dict[Any, Any], None] = None, ) -> None: if not catalog_info.catalog_type == CatalogType.ASSOCIATION: raise ValueError("Catalog info `catalog_type` must be 'association'") - super().__init__(catalog_info, pixels, catalog_path, moc=moc, schema=schema, storage_options=storage_options) + super().__init__( + catalog_info, pixels, catalog_path, moc=moc, schema=schema, storage_options=storage_options + ) self.join_info = self._get_partition_join_info_from_pixels(join_pixels) def get_join_pixels(self) -> pd.DataFrame: diff --git a/src/hipscat/catalog/catalog.py b/src/hipscat/catalog/catalog.py index 13b63ed3..ccb8a929 100644 --- a/src/hipscat/catalog/catalog.py +++ b/src/hipscat/catalog/catalog.py @@ -5,6 +5,7 @@ from typing import Any, Dict, List, Tuple, Union import numpy as np +import pyarrow as pa from mocpy import MOC from typing_extensions import TypeAlias @@ -23,7 +24,7 @@ validate_radius, ) from hipscat.pixel_tree.negative_tree import compute_negative_tree_pixels -import pyarrow as pa + class Catalog(HealpixDataset): """A HiPSCat Catalog with data stored in a HEALPix Hive partitioned structure @@ -67,7 +68,12 @@ def __init__( f"{', '.join([t.value for t in self.HIPS_CATALOG_TYPES])}" ) super().__init__( - catalog_info, pixels, catalog_path=catalog_path, moc=moc, schema=schema, storage_options=storage_options + catalog_info, + pixels, + catalog_path=catalog_path, + moc=moc, + schema=schema, + storage_options=storage_options, ) def filter_by_cone(self, ra: float, dec: float, radius_arcsec: float) -> Catalog: diff --git a/src/hipscat/catalog/healpix_dataset/healpix_dataset.py b/src/hipscat/catalog/healpix_dataset/healpix_dataset.py index ca32cd83..e793f86f 100644 --- a/src/hipscat/catalog/healpix_dataset/healpix_dataset.py +++ b/src/hipscat/catalog/healpix_dataset/healpix_dataset.py @@ -5,6 +5,7 @@ import numpy as np import pandas as pd +import pyarrow as pa from mocpy import MOC from typing_extensions import Self, TypeAlias @@ -18,7 +19,6 @@ from hipscat.pixel_tree.moc_filter import filter_by_moc from hipscat.pixel_tree.pixel_alignment import align_with_mocs from hipscat.pixel_tree.pixel_tree import PixelTree -import pyarrow as pa PixelInputTypes = Union[PartitionInfo, PixelTree, List[HealpixPixel]] @@ -125,7 +125,9 @@ def _read_moc_from_point_map( return MOC.from_healpix_cells(ipix, orders, order) @classmethod - def _read_schema_from_metadata(cls, catalog_base_dir: FilePointer, storage_options: dict | None = None) -> pa.Schema | None: + def _read_schema_from_metadata( + cls, catalog_base_dir: FilePointer, storage_options: dict | None = None + ) -> pa.Schema | None: """Reads the schema information stored in the _metadata file""" common_metadata_file = paths.get_common_metadata_pointer(catalog_base_dir) if file_io.does_file_or_directory_exist(common_metadata_file, storage_options=storage_options): diff --git a/src/hipscat/catalog/margin_cache/margin_catalog.py b/src/hipscat/catalog/margin_cache/margin_catalog.py index bdafe4b0..6a882d6f 100644 --- a/src/hipscat/catalog/margin_cache/margin_catalog.py +++ b/src/hipscat/catalog/margin_cache/margin_catalog.py @@ -1,5 +1,6 @@ from __future__ import annotations +import pyarrow as pa from mocpy import MOC from typing_extensions import Self, TypeAlias @@ -8,7 +9,6 @@ from hipscat.catalog.healpix_dataset.healpix_dataset import HealpixDataset, PixelInputTypes from hipscat.catalog.margin_cache import MarginCacheCatalogInfo from hipscat.pixel_tree.moc_utils import copy_moc -import pyarrow as pa class MarginCatalog(HealpixDataset): @@ -49,7 +49,12 @@ def __init__( if catalog_info.catalog_type != CatalogType.MARGIN: raise ValueError(f"Catalog info `catalog_type` must equal {CatalogType.MARGIN}") super().__init__( - catalog_info, pixels, catalog_path=catalog_path, moc=moc, schema=schema, storage_options=storage_options + catalog_info, + pixels, + catalog_path=catalog_path, + moc=moc, + schema=schema, + storage_options=storage_options, ) def filter_by_moc(self, moc: MOC) -> Self: From 6977e0234099c8fd47faeb523c60abc0d423c392 Mon Sep 17 00:00:00 2001 From: Sandro Campos Date: Thu, 25 Jul 2024 13:47:55 -0400 Subject: [PATCH 3/3] Ensure schema has been set in unit tests --- .../healpix_dataset/healpix_dataset.py | 22 ++++-- tests/conftest.py | 71 +++++++++++++++++++ .../test_association_catalog.py | 11 ++- .../margin_cache/test_margin_catalog.py | 6 +- tests/hipscat/catalog/test_catalog.py | 11 ++- tests/hipscat/io/conftest.py | 18 ----- tests/hipscat/io/test_parquet_metadata.py | 24 +++---- tests/hipscat/io/test_write_metadata.py | 12 ++-- 8 files changed, 126 insertions(+), 49 deletions(-) diff --git a/src/hipscat/catalog/healpix_dataset/healpix_dataset.py b/src/hipscat/catalog/healpix_dataset/healpix_dataset.py index e793f86f..419dd32f 100644 --- a/src/hipscat/catalog/healpix_dataset/healpix_dataset.py +++ b/src/hipscat/catalog/healpix_dataset/healpix_dataset.py @@ -1,6 +1,7 @@ from __future__ import annotations import dataclasses +import warnings from typing import Any, Dict, List, Tuple, Union import numpy as np @@ -128,17 +129,26 @@ def _read_moc_from_point_map( def _read_schema_from_metadata( cls, catalog_base_dir: FilePointer, storage_options: dict | None = None ) -> pa.Schema | None: - """Reads the schema information stored in the _metadata file""" + """Reads the schema information stored in the _common_metadata or _metadata files.""" common_metadata_file = paths.get_common_metadata_pointer(catalog_base_dir) - if file_io.does_file_or_directory_exist(common_metadata_file, storage_options=storage_options): - metadata = read_parquet_metadata(common_metadata_file, storage_options=storage_options) - return metadata.schema.to_arrow_schema() - return None + common_metadata_exists = file_io.does_file_or_directory_exist( + common_metadata_file, storage_options=storage_options + ) + metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir) + metadata_exists = file_io.does_file_or_directory_exist(metadata_file, storage_options=storage_options) + if not (common_metadata_exists or metadata_exists): + warnings.warn( + "_common_metadata or _metadata files not found for this catalog." + "The arrow schema will not be set." + ) + return None + schema_file = common_metadata_file if common_metadata_exists else metadata_file + metadata = read_parquet_metadata(schema_file, storage_options=storage_options) + return metadata.schema.to_arrow_schema() @classmethod def _check_files_exist(cls, catalog_base_dir: FilePointer, storage_options: dict = None): super()._check_files_exist(catalog_base_dir, storage_options=storage_options) - partition_info_file = paths.get_partition_info_pointer(catalog_base_dir) metadata_file = paths.get_parquet_metadata_pointer(catalog_base_dir) if not ( diff --git a/tests/conftest.py b/tests/conftest.py index ce08dc6d..b33b8714 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,6 +4,7 @@ from typing import List import pandas as pd +import pyarrow as pa import pytest from hipscat.catalog.association_catalog.association_catalog_info import AssociationCatalogInfo @@ -171,6 +172,76 @@ def index_catalog_info_with_extra() -> dict: } +@pytest.fixture +def small_sky_schema() -> pa.Schema: + return pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("ra", pa.float64()), + pa.field("dec", pa.float64()), + pa.field("ra_error", pa.int64()), + pa.field("dec_error", pa.int64()), + pa.field("Norder", pa.uint8()), + pa.field("Dir", pa.uint64()), + pa.field("Npix", pa.uint64()), + pa.field("_hipscat_index", pa.uint64()), + ] + ) + + +@pytest.fixture +def small_sky_source_schema() -> pa.Schema: + return pa.schema( + [ + pa.field("source_id", pa.int64()), + pa.field("source_ra", pa.float64()), + pa.field("source_dec", pa.float64()), + pa.field("mjd", pa.float64()), + pa.field("mag", pa.float64()), + pa.field("band", pa.string()), + pa.field("object_id", pa.int64()), + pa.field("object_ra", pa.float64()), + pa.field("object_dec", pa.float64()), + pa.field("Norder", pa.uint8()), + pa.field("Dir", pa.uint64()), + pa.field("Npix", pa.uint64()), + pa.field("_hipscat_index", pa.uint64()), + ] + ) + + +@pytest.fixture +def association_catalog_schema() -> pa.Schema: + return pa.schema( + [ + pa.field("Norder", pa.int64()), + pa.field("Npix", pa.int64()), + pa.field("join_Norder", pa.int64()), + pa.field("join_Npix", pa.int64()), + ] + ) + + +@pytest.fixture +def margin_catalog_schema() -> pa.Schema: + return pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("ra", pa.float64()), + pa.field("dec", pa.float64()), + pa.field("ra_error", pa.int64()), + pa.field("dec_error", pa.int64()), + pa.field("Norder", pa.uint8()), + pa.field("Dir", pa.uint64()), + pa.field("Npix", pa.uint64()), + pa.field("_hipscat_index", pa.uint64()), + pa.field("margin_Norder", pa.uint8()), + pa.field("margin_Dir", pa.uint64()), + pa.field("margin_Npix", pa.uint64()), + ] + ) + + @pytest.fixture def dataset_path(test_data_dir) -> str: return test_data_dir / "info_only" / "dataset" diff --git a/tests/hipscat/catalog/association_catalog/test_association_catalog.py b/tests/hipscat/catalog/association_catalog/test_association_catalog.py index 79fdf2a0..52e6bc80 100644 --- a/tests/hipscat/catalog/association_catalog/test_association_catalog.py +++ b/tests/hipscat/catalog/association_catalog/test_association_catalog.py @@ -2,6 +2,7 @@ import os import pandas as pd +import pyarrow as pa import pytest from hipscat.catalog import CatalogType @@ -49,7 +50,9 @@ def test_different_join_pixels_type(association_catalog_info, association_catalo pd.testing.assert_frame_equal(catalog.get_join_pixels(), association_catalog_join_pixels) -def test_read_from_file(association_catalog_path, association_catalog_join_pixels): +def test_read_from_file( + association_catalog_path, association_catalog_join_pixels, association_catalog_schema +): catalog = read_from_hipscat(association_catalog_path) assert isinstance(catalog, AssociationCatalog) @@ -66,6 +69,9 @@ def test_read_from_file(association_catalog_path, association_catalog_join_pixel assert info.join_catalog == "small_sky_order1" assert info.join_column == "id" + assert isinstance(catalog.schema, pa.Schema) + assert catalog.schema.equals(association_catalog_schema) + def test_empty_directory(tmp_path, association_catalog_info_data, association_catalog_join_pixels): """Test loading empty or incomplete data""" @@ -121,5 +127,6 @@ def test_csv_round_trip(tmp_path, association_catalog_info_data, association_cat part_info = PartitionJoinInfo(association_catalog_join_pixels) part_info.write_to_csv(catalog_path=catalog_path) - catalog = read_from_hipscat(catalog_path) + with pytest.warns(UserWarning, match="_common_metadata or _metadata files not found"): + catalog = read_from_hipscat(catalog_path) pd.testing.assert_frame_equal(catalog.get_join_pixels(), association_catalog_join_pixels) diff --git a/tests/hipscat/catalog/margin_cache/test_margin_catalog.py b/tests/hipscat/catalog/margin_cache/test_margin_catalog.py index 30f80272..aa260b30 100644 --- a/tests/hipscat/catalog/margin_cache/test_margin_catalog.py +++ b/tests/hipscat/catalog/margin_cache/test_margin_catalog.py @@ -1,6 +1,7 @@ import json import os +import pyarrow as pa import pytest from hipscat.catalog import CatalogType, MarginCatalog, PartitionInfo @@ -32,7 +33,7 @@ def test_wrong_catalog_info_type(catalog_info, margin_catalog_pixels): MarginCatalog(catalog_info, margin_catalog_pixels) -def test_read_from_file(margin_catalog_path, margin_catalog_pixels): +def test_read_from_file(margin_catalog_path, margin_catalog_pixels, margin_catalog_schema): catalog = read_from_hipscat(margin_catalog_path) assert isinstance(catalog, MarginCatalog) @@ -50,6 +51,9 @@ def test_read_from_file(margin_catalog_path, margin_catalog_pixels): assert info.primary_catalog == "small_sky_order1" assert info.margin_threshold == 7200 + assert isinstance(catalog.schema, pa.Schema) + assert catalog.schema.equals(margin_catalog_schema) + # pylint: disable=duplicate-code def test_empty_directory(tmp_path, margin_cache_catalog_info_data, margin_catalog_pixels): diff --git a/tests/hipscat/catalog/test_catalog.py b/tests/hipscat/catalog/test_catalog.py index 9c4e07dd..91778d0f 100644 --- a/tests/hipscat/catalog/test_catalog.py +++ b/tests/hipscat/catalog/test_catalog.py @@ -4,6 +4,7 @@ import astropy.units as u import numpy as np +import pyarrow as pa import pytest from mocpy import MOC @@ -79,7 +80,7 @@ def test_get_pixels_list(catalog_info, catalog_pixels): assert pixels == catalog_pixels -def test_load_catalog_small_sky(small_sky_dir): +def test_load_catalog_small_sky(small_sky_dir, small_sky_schema): """Instantiate a catalog with 1 pixel""" cat = read_from_hipscat(small_sky_dir) @@ -87,6 +88,9 @@ def test_load_catalog_small_sky(small_sky_dir): assert cat.catalog_name == "small_sky" assert len(cat.get_healpix_pixels()) == 1 + assert isinstance(cat.schema, pa.Schema) + assert cat.schema.equals(small_sky_schema) + def test_load_catalog_small_sky_order1(small_sky_order1_dir): """Instantiate a catalog with 4 pixels""" @@ -109,7 +113,7 @@ def test_load_catalog_small_sky_order1_moc(small_sky_order1_dir): assert np.all(cat.moc.flatten() == np.where(counts_skymap > 0)) -def test_load_catalog_small_sky_source(small_sky_source_dir): +def test_load_catalog_small_sky_source(small_sky_source_dir, small_sky_source_schema): """Instantiate a source catalog with 14 pixels""" cat = read_from_hipscat(small_sky_source_dir) @@ -117,6 +121,9 @@ def test_load_catalog_small_sky_source(small_sky_source_dir): assert cat.catalog_name == "small_sky_source" assert len(cat.get_healpix_pixels()) == 14 + assert isinstance(cat.schema, pa.Schema) + assert cat.schema.equals(small_sky_source_schema) + def test_max_coverage_order(small_sky_order1_catalog): assert small_sky_order1_catalog.get_max_coverage_order() >= small_sky_order1_catalog.moc.max_order diff --git a/tests/hipscat/io/conftest.py b/tests/hipscat/io/conftest.py index ef53ecad..f0fd09b1 100644 --- a/tests/hipscat/io/conftest.py +++ b/tests/hipscat/io/conftest.py @@ -3,7 +3,6 @@ import re import numpy.testing as npt -import pyarrow as pa import pytest from hipscat.io import file_io @@ -48,23 +47,6 @@ def assert_text_file_matches(expected_lines, file_name, storage_options: dict = return assert_text_file_matches -@pytest.fixture -def basic_catalog_parquet_metadata(): - return pa.schema( - [ - pa.field("id", pa.int64()), - pa.field("ra", pa.float64()), - pa.field("dec", pa.float64()), - pa.field("ra_error", pa.int64()), - pa.field("dec_error", pa.int64()), - pa.field("Norder", pa.uint8()), - pa.field("Dir", pa.uint64()), - pa.field("Npix", pa.uint64()), - pa.field("_hipscat_index", pa.uint64()), - ] - ) - - @pytest.fixture def check_parquet_schema(): def check_parquet_schema(file_name, expected_schema, expected_num_row_groups=1): diff --git a/tests/hipscat/io/test_parquet_metadata.py b/tests/hipscat/io/test_parquet_metadata.py index 355b1ee0..97115b95 100644 --- a/tests/hipscat/io/test_parquet_metadata.py +++ b/tests/hipscat/io/test_parquet_metadata.py @@ -16,9 +16,7 @@ from hipscat.pixel_math.healpix_pixel import HealpixPixel -def test_write_parquet_metadata( - tmp_path, small_sky_dir, basic_catalog_parquet_metadata, check_parquet_schema -): +def test_write_parquet_metadata(tmp_path, small_sky_dir, small_sky_schema, check_parquet_schema): """Copy existing catalog and create new metadata files for it""" catalog_base_dir = tmp_path / "catalog" shutil.copytree( @@ -27,27 +25,27 @@ def test_write_parquet_metadata( ) total_rows = write_parquet_metadata(catalog_base_dir) assert total_rows == 131 - check_parquet_schema(catalog_base_dir / "_metadata", basic_catalog_parquet_metadata) + check_parquet_schema(catalog_base_dir / "_metadata", small_sky_schema) ## _common_metadata has 0 row groups check_parquet_schema( catalog_base_dir / "_common_metadata", - basic_catalog_parquet_metadata, + small_sky_schema, 0, ) ## Re-write - should still have the same properties. total_rows = write_parquet_metadata(catalog_base_dir) assert total_rows == 131 - check_parquet_schema(catalog_base_dir / "_metadata", basic_catalog_parquet_metadata) + check_parquet_schema(catalog_base_dir / "_metadata", small_sky_schema) ## _common_metadata has 0 row groups check_parquet_schema( catalog_base_dir / "_common_metadata", - basic_catalog_parquet_metadata, + small_sky_schema, 0, ) def test_write_parquet_metadata_order1( - tmp_path, small_sky_order1_dir, basic_catalog_parquet_metadata, check_parquet_schema + tmp_path, small_sky_order1_dir, small_sky_schema, check_parquet_schema ): """Copy existing catalog and create new metadata files for it, using a catalog with multiple files.""" @@ -62,19 +60,19 @@ def test_write_parquet_metadata_order1( ## 4 row groups for 4 partitioned parquet files check_parquet_schema( temp_path / "_metadata", - basic_catalog_parquet_metadata, + small_sky_schema, 4, ) ## _common_metadata has 0 row groups check_parquet_schema( temp_path / "_common_metadata", - basic_catalog_parquet_metadata, + small_sky_schema, 0, ) def test_write_parquet_metadata_sorted( - tmp_path, small_sky_order1_dir, basic_catalog_parquet_metadata, check_parquet_schema + tmp_path, small_sky_order1_dir, small_sky_schema, check_parquet_schema ): """Copy existing catalog and create new metadata files for it, using a catalog with multiple files.""" @@ -89,13 +87,13 @@ def test_write_parquet_metadata_sorted( ## 4 row groups for 4 partitioned parquet files check_parquet_schema( temp_path / "_metadata", - basic_catalog_parquet_metadata, + small_sky_schema, 4, ) ## _common_metadata has 0 row groups check_parquet_schema( temp_path / "_common_metadata", - basic_catalog_parquet_metadata, + small_sky_schema, 0, ) diff --git a/tests/hipscat/io/test_write_metadata.py b/tests/hipscat/io/test_write_metadata.py index cfe80411..8df9bc96 100644 --- a/tests/hipscat/io/test_write_metadata.py +++ b/tests/hipscat/io/test_write_metadata.py @@ -178,9 +178,7 @@ def test_write_partition_info_float(assert_text_file_matches, tmp_path): assert_text_file_matches(expected_lines, metadata_filename) -def test_write_parquet_metadata( - tmp_path, small_sky_dir, basic_catalog_parquet_metadata, check_parquet_schema -): +def test_write_parquet_metadata(tmp_path, small_sky_dir, small_sky_schema, check_parquet_schema): """Copy existing catalog and create new metadata files for it""" catalog_base_dir = tmp_path / "catalog" shutil.copytree( @@ -188,20 +186,20 @@ def test_write_parquet_metadata( catalog_base_dir, ) io.write_parquet_metadata(catalog_base_dir) - check_parquet_schema(catalog_base_dir / "_metadata", basic_catalog_parquet_metadata) + check_parquet_schema(catalog_base_dir / "_metadata", small_sky_schema) ## _common_metadata has 0 row groups check_parquet_schema( catalog_base_dir / "_common_metadata", - basic_catalog_parquet_metadata, + small_sky_schema, 0, ) ## Re-write - should still have the same properties. io.write_parquet_metadata(catalog_base_dir) - check_parquet_schema(catalog_base_dir / "_metadata", basic_catalog_parquet_metadata) + check_parquet_schema(catalog_base_dir / "_metadata", small_sky_schema) ## _common_metadata has 0 row groups check_parquet_schema( catalog_base_dir / "_common_metadata", - basic_catalog_parquet_metadata, + small_sky_schema, 0, )