Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pull resume logic into helper class. #95

Merged
merged 15 commits into from
Jul 12, 2023
15 changes: 1 addition & 14 deletions src/hipscat_import/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,5 @@
get_file_reader,
)
from .map_reduce import map_to_pixels, reduce_pixel_shards, split_pixels
from .resume_files import (
clean_resume_files,
is_mapping_done,
is_reducing_done,
read_histogram,
read_mapping_keys,
read_reducing_keys,
set_mapping_done,
set_reducing_done,
write_histogram,
write_mapping_done_key,
write_mapping_start_key,
write_reducing_key,
)
from .resume_plan import ResumePlan
from .run_import import run
48 changes: 15 additions & 33 deletions src/hipscat_import/catalog/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
from __future__ import annotations

from dataclasses import dataclass, field
from typing import Callable, List
from typing import List

import pandas as pd
from hipscat.catalog.catalog import CatalogInfo
from hipscat.io import FilePointer, file_io
from hipscat.pixel_math import hipscat_id

from hipscat_import.catalog.file_readers import InputReader, get_file_reader
from hipscat_import.catalog.resume_plan import ResumePlan
from hipscat_import.runtime_arguments import RuntimeArguments

# pylint: disable=too-many-locals,too-many-arguments,too-many-instance-attributes,too-many-branches,too-few-public-methods
Expand Down Expand Up @@ -68,16 +68,13 @@ class ImportArguments(RuntimeArguments):
debug_stats_only: bool = False
"""do not perform a map reduce and don't create a new
catalog. generate the partition info"""
filter_function: Callable | None = None
"""optional method which takes a pandas dataframe as input, performs some
filtering or transformation of the data, and returns a dataframe with the
rows that will be used to create the new catalog"""
file_reader: InputReader | None = None
"""instance of input reader that specifies arguments necessary for reading
from your input files"""
resume_plan: ResumePlan | None = None
"""container that handles read/write of log files for this pipeline"""

def __post_init__(self):

self._check_arguments()

def _check_arguments(self):
Expand All @@ -97,7 +94,9 @@ def _check_arguments(self):
self.highest_healpix_order, "highest_healpix_order"
)
if not 100 <= self.pixel_threshold <= 1_000_000_000:
raise ValueError("pixel_threshold should be between 100 and 1,000,000,000")
raise ValueError(
"pixel_threshold should be between 100 and 1,000,000,000"
)
self.mapping_healpix_order = self.highest_healpix_order

if self.catalog_type not in ("source", "object"):
Expand All @@ -117,28 +116,16 @@ def _check_arguments(self):
self.input_paths = file_io.find_files_matching_path(
self.input_path, f"*{self.input_format}"
)

if len(self.input_paths) == 0:
raise FileNotFoundError(
f"No files matched file pattern: {self.input_path}*{self.input_format} "
)
elif self.input_file_list:
self.input_paths = self.input_file_list
for test_path in self.input_paths:
if not file_io.does_file_or_directory_exist(test_path):
raise FileNotFoundError(f"{test_path} not found on local storage")
self.input_paths.sort()

if not self.resume:
if file_io.directory_has_contents(self.tmp_path):
raise ValueError(
f"tmp_path ({self.tmp_path}) contains intermediate files."
" choose a different directory or use --resume flag"
)
file_io.make_directory(self.tmp_path, exist_ok=True)

if not self.filter_function:
self.filter_function = passthrough_filter_function
if len(self.input_paths) == 0:
raise FileNotFoundError("No input files found")
self.resume_plan = ResumePlan(
resume=self.resume,
progress_bar=self.progress_bar,
input_paths=self.input_paths,
tmp_path=self.tmp_path,
)

def to_catalog_info(self, total_rows) -> CatalogInfo:
"""Catalog-type-specific dataset info."""
Expand Down Expand Up @@ -200,8 +187,3 @@ def check_healpix_order_range(
raise ValueError(
f"{field_name} should be between {lower_bound} and {upper_bound}"
)


def passthrough_filter_function(data: pd.DataFrame) -> pd.DataFrame:
"""No-op filter function to be used when no user-defined filter is provided"""
return data
18 changes: 18 additions & 0 deletions src/hipscat_import/catalog/file_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pyarrow as pa
import pyarrow.parquet as pq
from astropy.table import Table
from hipscat.io import file_io

# pylint: disable=too-few-public-methods,too-many-arguments

Expand Down Expand Up @@ -87,6 +88,19 @@ def provenance_info(self) -> dict:
dictionary with all argument_name -> argument_value as key -> value pairs.
"""

def regular_file_exists(self, input_file):
"""Check that the `input_file` points to a single regular file

Raises
FileNotFoundError: if nothing exists at path, or directory found.
"""
if not file_io.does_file_or_directory_exist(input_file):
raise FileNotFoundError(f"File not found at path: {input_file}")
if not file_io.is_regular_file(input_file):
raise FileNotFoundError(
f"Directory found at path - requires regular file: {input_file}"
)


class CsvReader(InputReader):
"""CSV reader for the most common CSV reading arguments.
Expand Down Expand Up @@ -125,6 +139,8 @@ def __init__(
self.kwargs = kwargs

def read(self, input_file):
self.regular_file_exists(input_file)

if self.schema_file:
schema_parquet = pd.read_parquet(
self.schema_file, dtype_backend="numpy_nullable"
Expand Down Expand Up @@ -206,6 +222,7 @@ def __init__(
self.kwargs = kwargs

def read(self, input_file):
self.regular_file_exists(input_file)
table = Table.read(input_file, memmap=True, **self.kwargs)
if self.column_names:
table.keep_columns(self.column_names)
Expand Down Expand Up @@ -243,6 +260,7 @@ def __init__(self, chunksize=500_000, **kwargs):
self.kwargs = kwargs

def read(self, input_file):
self.regular_file_exists(input_file)
parquet_file = pq.read_table(input_file, **self.kwargs)
for smaller_table in parquet_file.to_batches(max_chunksize=self.chunksize):
yield pa.Table.from_batches([smaller_table]).to_pandas()
Expand Down
26 changes: 9 additions & 17 deletions src/hipscat_import/catalog/map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from hipscat.io import FilePointer, file_io, paths

from hipscat_import.catalog.file_readers import InputReader
from hipscat_import.catalog.resume_plan import ResumePlan

# pylint: disable=too-many-locals,too-many-arguments

Expand Down Expand Up @@ -54,15 +55,8 @@ def _iterate_input_file(
highest_order,
ra_column,
dec_column,
filter_function=None,
):
"""Helper function to handle input file reading and healpix pixel calculation"""
if not file_io.does_file_or_directory_exist(input_file):
raise FileNotFoundError(f"File not found at path: {input_file}")
if not file_io.is_regular_file(input_file):
raise FileNotFoundError(
f"Directory found at path - requires regular file: {input_file}"
)
if not file_reader:
raise NotImplementedError("No file reader implemented")

Expand All @@ -73,9 +67,7 @@ def _iterate_input_file(
raise ValueError(
f"Invalid column names in input file: {ra_column}, {dec_column} not in {input_file}"
)
# Set up the data we want (filter and find pixel)
if filter_function:
data = filter_function(data)
# Set up the pixel data
mapped_pixels = hp.ang2pix(
2**highest_order,
data[ra_column].values,
Expand All @@ -89,10 +81,11 @@ def _iterate_input_file(
def map_to_pixels(
input_file: FilePointer,
file_reader: InputReader,
cache_path: FilePointer,
mapping_key,
highest_order,
ra_column,
dec_column,
filter_function=None,
):
"""Map a file of input objects to their healpix pixels.

Expand All @@ -115,11 +108,13 @@ def map_to_pixels(
"""
histo = pixel_math.empty_histogram(highest_order)
for _, _, mapped_pixels in _iterate_input_file(
input_file, file_reader, highest_order, ra_column, dec_column, filter_function
input_file, file_reader, highest_order, ra_column, dec_column
):
mapped_pixel, count_at_pixel = np.unique(mapped_pixels, return_counts=True)
histo[mapped_pixel] += count_at_pixel.astype(np.int64)
return histo
ResumePlan.write_partial_histogram(
tmp_path=cache_path, mapping_key=mapping_key, histogram=histo
)


def split_pixels(
Expand All @@ -131,7 +126,6 @@ def split_pixels(
dec_column,
cache_path: FilePointer,
alignment=None,
filter_function=None,
):
"""Map a file of input objects to their healpix pixels and split into shards.

Expand All @@ -145,15 +139,13 @@ def split_pixels(
ra_column (str): where to find right ascension data in the dataframe
dec_column (str): where to find declation in the dataframe
cache_path (FilePointer): where to write intermediate files.
filter_function (function pointer): method to perform some filtering
or transformation of the input data

Raises:
ValueError: if the `ra_column` or `dec_column` cannot be found in the input file.
FileNotFoundError: if the file does not exist, or is a directory
"""
for chunk_number, data, mapped_pixels in _iterate_input_file(
input_file, file_reader, highest_order, ra_column, dec_column, filter_function
input_file, file_reader, highest_order, ra_column, dec_column
):
aligned_pixels = alignment[mapped_pixels]
unique_pixels, unique_inverse = np.unique(aligned_pixels, return_inverse=True)
Expand Down
140 changes: 0 additions & 140 deletions src/hipscat_import/catalog/resume_files.py

This file was deleted.

Loading