Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store arrow schema when reading catalogs #310

Merged
merged 4 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Any, Dict, Tuple, Union

import pandas as pd
import pyarrow as pa
from mocpy import MOC
from typing_extensions import TypeAlias

Expand Down Expand Up @@ -35,11 +36,14 @@ def __init__(
join_pixels: JoinPixelInputTypes,
catalog_path=None,
moc: MOC | None = None,
schema: pa.Schema | None = None,
storage_options: Union[Dict[Any, Any], None] = None,
) -> None:
if not catalog_info.catalog_type == CatalogType.ASSOCIATION:
raise ValueError("Catalog info `catalog_type` must be 'association'")
super().__init__(catalog_info, pixels, catalog_path, moc=moc, storage_options=storage_options)
super().__init__(
catalog_info, pixels, catalog_path, moc=moc, schema=schema, storage_options=storage_options
)
self.join_info = self._get_partition_join_info_from_pixels(join_pixels)

def get_join_pixels(self) -> pd.DataFrame:
Expand Down
12 changes: 10 additions & 2 deletions src/hipscat/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Any, Dict, List, Tuple, Union

import numpy as np
import pyarrow as pa
from mocpy import MOC
from typing_extensions import TypeAlias

Expand Down Expand Up @@ -46,6 +47,7 @@ def __init__(
pixels: PixelInputTypes,
catalog_path: str = None,
moc: MOC | None = None,
schema: pa.Schema | None = None,
storage_options: Union[Dict[Any, Any], None] = None,
) -> None:
"""Initializes a Catalog
Expand All @@ -56,16 +58,22 @@ def __init__(
list of HealpixPixel, `PartitionInfo object`, or a `PixelTree` object
catalog_path: If the catalog is stored on disk, specify the location of the catalog
Does not load the catalog from this path, only store as metadata
storage_options: dictionary that contains abstract filesystem credentials
moc (mocpy.MOC): MOC object representing the coverage of the catalog
schema (pa.Schema): The pyarrow schema for the catalog
storage_options: dictionary that contains abstract filesystem credentials
"""
if catalog_info.catalog_type not in self.HIPS_CATALOG_TYPES:
raise ValueError(
f"Catalog info `catalog_type` must be one of "
f"{', '.join([t.value for t in self.HIPS_CATALOG_TYPES])}"
)
super().__init__(
catalog_info, pixels, catalog_path=catalog_path, moc=moc, storage_options=storage_options
catalog_info,
pixels,
catalog_path=catalog_path,
moc=moc,
schema=schema,
storage_options=storage_options,
)

def filter_by_cone(self, ra: float, dec: float, radius_arcsec: float) -> Catalog:
Expand Down
21 changes: 19 additions & 2 deletions src/hipscat/catalog/healpix_dataset/healpix_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@

import numpy as np
import pandas as pd
import pyarrow as pa
from mocpy import MOC
from typing_extensions import Self, TypeAlias

import hipscat.pixel_math.healpix_shim as hp
from hipscat.catalog.dataset import BaseCatalogInfo, Dataset
from hipscat.catalog.partition_info import PartitionInfo
from hipscat.io import FilePointer, file_io, paths
from hipscat.io.file_io import read_parquet_metadata
from hipscat.pixel_math import HealpixPixel
from hipscat.pixel_tree import PixelAlignment, PixelAlignmentType
from hipscat.pixel_tree.moc_filter import filter_by_moc
Expand Down Expand Up @@ -39,6 +41,7 @@ def __init__(
pixels: PixelInputTypes,
catalog_path: str = None,
moc: MOC | None = None,
schema: pa.Schema | None = None,
storage_options: Union[Dict[Any, Any], None] = None,
) -> None:
"""Initializes a Catalog
Expand All @@ -49,13 +52,15 @@ def __init__(
list of HealpixPixel, `PartitionInfo object`, or a `PixelTree` object
catalog_path: If the catalog is stored on disk, specify the location of the catalog
Does not load the catalog from this path, only store as metadata
storage_options: dictionary that contains abstract filesystem credentials
moc (mocpy.MOC): MOC object representing the coverage of the catalog
schema (pa.Schema): The pyarrow schema for the catalog
storage_options: dictionary that contains abstract filesystem credentials
"""
super().__init__(catalog_info, catalog_path=catalog_path, storage_options=storage_options)
self.partition_info = self._get_partition_info_from_pixels(pixels)
self.pixel_tree = self._get_pixel_tree_from_pixels(pixels)
self.moc = moc
self.schema = schema

def get_healpix_pixels(self) -> List[HealpixPixel]:
"""Get healpix pixel objects for all pixels contained in the catalog.
Expand Down Expand Up @@ -101,6 +106,7 @@ def _read_kwargs(
) -> dict:
kwargs = super()._read_kwargs(catalog_base_dir, storage_options=storage_options)
kwargs["moc"] = cls._read_moc_from_point_map(catalog_base_dir, storage_options)
kwargs["schema"] = cls._read_schema_from_metadata(catalog_base_dir, storage_options)
return kwargs

@classmethod
Expand All @@ -118,6 +124,17 @@ def _read_moc_from_point_map(
orders = np.full(ipix.shape, order)
return MOC.from_healpix_cells(ipix, orders, order)

@classmethod
def _read_schema_from_metadata(
cls, catalog_base_dir: FilePointer, storage_options: dict | None = None
) -> pa.Schema | None:
"""Reads the schema information stored in the _metadata file"""
common_metadata_file = paths.get_common_metadata_pointer(catalog_base_dir)
if file_io.does_file_or_directory_exist(common_metadata_file, storage_options=storage_options):
metadata = read_parquet_metadata(common_metadata_file, storage_options=storage_options)
return metadata.schema.to_arrow_schema()
return None

@classmethod
def _check_files_exist(cls, catalog_base_dir: FilePointer, storage_options: dict = None):
super()._check_files_exist(catalog_base_dir, storage_options=storage_options)
Expand Down Expand Up @@ -170,7 +187,7 @@ def filter_by_moc(self, moc: MOC) -> Self:
filtered_tree = filter_by_moc(self.pixel_tree, moc)
filtered_moc = self.moc.intersection(moc) if self.moc is not None else None
filtered_catalog_info = dataclasses.replace(self.catalog_info, total_rows=None)
return self.__class__(filtered_catalog_info, filtered_tree, moc=filtered_moc)
return self.__class__(filtered_catalog_info, filtered_tree, moc=filtered_moc, schema=self.schema)

def align(
self, other_cat: Self, alignment_type: PixelAlignmentType = PixelAlignmentType.INNER
Expand Down
12 changes: 10 additions & 2 deletions src/hipscat/catalog/margin_cache/margin_catalog.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import pyarrow as pa
from mocpy import MOC
from typing_extensions import Self, TypeAlias

Expand Down Expand Up @@ -30,6 +31,7 @@ def __init__(
pixels: PixelInputTypes,
catalog_path: str = None,
moc: MOC | None = None,
schema: pa.Schema | None = None,
storage_options: dict | None = None,
) -> None:
"""Initializes a Margin Catalog
Expand All @@ -40,13 +42,19 @@ def __init__(
list of HealpixPixel, `PartitionInfo object`, or a `PixelTree` object
catalog_path: If the catalog is stored on disk, specify the location of the catalog
Does not load the catalog from this path, only store as metadata
storage_options: dictionary that contains abstract filesystem credentials
moc (mocpy.MOC): MOC object representing the coverage of the catalog
schema (pa.Schema): The pyarrow schema for the catalog
storage_options: dictionary that contains abstract filesystem credentials
"""
if catalog_info.catalog_type != CatalogType.MARGIN:
raise ValueError(f"Catalog info `catalog_type` must equal {CatalogType.MARGIN}")
super().__init__(
catalog_info, pixels, catalog_path=catalog_path, moc=moc, storage_options=storage_options
catalog_info,
pixels,
catalog_path=catalog_path,
moc=moc,
schema=schema,
storage_options=storage_options,
)

def filter_by_moc(self, moc: MOC) -> Self:
Expand Down