Skip to content

Commit

Permalink
Merge pull request #68 from astronomy-commons/delucchi/pandas_md
Browse files Browse the repository at this point in the history
Write final parquet with pandas metadata.
  • Loading branch information
delucchi-cmu authored Apr 20, 2023
2 parents dbb1fdd + 22f8bca commit cd602c5
Show file tree
Hide file tree
Showing 28 changed files with 339 additions and 50 deletions.
92 changes: 64 additions & 28 deletions src/hipscat_import/catalog/map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,22 @@ def _get_pixel_directory(cache_path: FilePointer, pixel: np.int64):
)


def _has_named_index(dataframe):
"""Heuristic to determine if a dataframe has some meaningful index.
This will reject dataframes with no index name for a single index,
or empty names for multi-index (e.g. [] or [None]).
"""
if dataframe.index.name is not None:
## Single index with a given name.
return True
if len(dataframe.index.names) == 0 or all(
name is None for name in dataframe.index.names
):
return False
return True


def map_to_pixels(
input_file: FilePointer,
file_reader,
Expand Down Expand Up @@ -107,7 +123,10 @@ def map_to_pixels(
output_file = file_io.append_paths_to_pointer(
pixel_dir, f"shard_{shard_suffix}_{chunk_number}.parquet"
)
filtered_data.to_parquet(output_file)
if _has_named_index(filtered_data):
filtered_data.to_parquet(output_file, index=True)
else:
filtered_data.to_parquet(output_file, index=False)
del filtered_data, data_indexes

## Pesky memory!
Expand All @@ -131,6 +150,24 @@ def reduce_pixel_shards(
):
"""Reduce sharded source pixels into destination pixels.
In addition to combining multiple shards of data into a single
parquet file, this method will add a few new columns:
- `Norder` - the healpix order for the pixel
- `Dir` - the directory part, corresponding to the pixel
` `Npix` - the healpix pixel
- `_hipscat_index` - optional - a spatially-correlated
64-bit index field.
Notes on `_hipscat_index`:
- if we generate the field, we will promote any previous
*named* pandas index field(s) to a column with
that name.
- see `hipscat.pixel_math.hipscat_id`
for more in-depth discussion of this field.
Args:
cache_path (str): where to read intermediate files
origin_pixel_numbers (list[int]): high order pixels, with object
Expand All @@ -141,8 +178,12 @@ def reduce_pixel_shards(
for the catalog's final pixel
output_path (str): where to write the final catalog pixel data
id_column (str): column for survey identifier, or other sortable column
add_hipscat_index (bool): should we add a _hipscat_index column to
the resulting parquet file?
delete_input_files (bool): should we delete the intermediate files
used as input for this method.
use_schema_file (str): use the parquet schema from the indicated
parquet file.
Raises:
ValueError: if the number of rows written doesn't equal provided
Expand Down Expand Up @@ -181,40 +222,35 @@ def reduce_pixel_shards(
f" Expected {destination_pixel_size}, wrote {rows_written}"
)

dataframe = merged_table.to_pandas()
if id_column:
merged_table = merged_table.sort_by(id_column)
dataframe = dataframe.sort_values(id_column)
if add_hipscat_index:
merged_table = merged_table.append_column(
"_hipscat_index",
[
pixel_math.compute_hipscat_id(
merged_table[ra_column].to_pylist(),
merged_table[dec_column].to_pylist(),
)
],
dataframe["_hipscat_index"] = pixel_math.compute_hipscat_id(
dataframe[ra_column].values,
dataframe[dec_column].values,
)
merged_table = merged_table.sort_by("_hipscat_index")
merged_table = merged_table.append_column(
"Norder",
[np.full(rows_written, fill_value=destination_pixel_order, dtype=np.int32)],

dataframe["Norder"] = np.full(
rows_written, fill_value=destination_pixel_order, dtype=np.int32
)
merged_table = merged_table.append_column(
"Dir",
[
np.full(
rows_written,
fill_value=int(destination_pixel_number / 10_000) * 10_000,
dtype=np.int32,
)
],
dataframe["Dir"] = np.full(
rows_written,
fill_value=int(destination_pixel_number / 10_000) * 10_000,
dtype=np.int32,
)
merged_table = merged_table.append_column(
"Npix",
[np.full(rows_written, fill_value=destination_pixel_number, dtype=np.int32)],
dataframe["Npix"] = np.full(
rows_written, fill_value=destination_pixel_number, dtype=np.int32
)
pq.write_table(merged_table, where=destination_file)

del merged_table, tables
if add_hipscat_index:
## If we had a meaningful index before, preserve it as a column.
if _has_named_index(dataframe):
dataframe = dataframe.reset_index()
dataframe = dataframe.set_index("_hipscat_index").sort_index()
dataframe.to_parquet(destination_file)

del dataframe, merged_table, tables

if delete_input_files:
for pixel in origin_pixel_numbers:
Expand Down
9 changes: 6 additions & 3 deletions tests/hipscat_import/catalog/test_file_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@
import pytest
from hipscat.catalog import CatalogParameters

from hipscat_import.catalog.file_readers import (CsvReader, FitsReader,
ParquetReader,
get_file_reader)
from hipscat_import.catalog.file_readers import (
CsvReader,
FitsReader,
ParquetReader,
get_file_reader,
)


def test_unknown_file_type():
Expand Down
40 changes: 37 additions & 3 deletions tests/hipscat_import/catalog/test_map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import hipscat.pixel_math as hist
import numpy.testing as npt
import pandas as pd
import pyarrow as pa
import pytest

Expand Down Expand Up @@ -198,6 +199,7 @@ def test_reduce_order0(parquet_shards_dir, assert_parquet_file_ids, tmp_path):
destination_pixel_number=11,
destination_pixel_size=131,
output_path=tmp_path,
add_hipscat_index=True,
ra_column="ra",
dec_column="dec",
id_column="id",
Expand All @@ -210,8 +212,10 @@ def test_reduce_order0(parquet_shards_dir, assert_parquet_file_ids, tmp_path):
assert_parquet_file_ids(output_file, "id", expected_ids)


def test_reduce_hipscat_index(parquet_shards_dir, assert_parquet_file_ids, tmp_path):
"""Test reducing into one large pixel"""
def test_reduce_hipscat_index(
parquet_shards_dir, assert_parquet_file_ids, assert_parquet_file_index, tmp_path
):
"""Test reducing with or without a _hipscat_index field"""
mr.reduce_pixel_shards(
cache_path=parquet_shards_dir,
origin_pixel_numbers=[47],
Expand Down Expand Up @@ -268,7 +272,37 @@ def test_reduce_hipscat_index(parquet_shards_dir, assert_parquet_file_ids, tmp_p
13564690156971098112,
13557377060258709504,
]
assert_parquet_file_ids(output_file, "_hipscat_index", expected_indexes)
assert_parquet_file_index(output_file, expected_indexes)
data_frame = pd.read_parquet(output_file, engine="pyarrow")
assert data_frame.index.name == "_hipscat_index"
npt.assert_array_equal(
data_frame.columns,
["id", "ra", "dec", "ra_error", "dec_error", "Norder", "Dir", "Npix"],
)

mr.reduce_pixel_shards(
cache_path=parquet_shards_dir,
origin_pixel_numbers=[47],
destination_pixel_order=0,
destination_pixel_number=11,
destination_pixel_size=18,
output_path=tmp_path,
add_hipscat_index=False, ## different from above
ra_column="ra",
dec_column="dec",
id_column="id",
delete_input_files=False,
)

assert_parquet_file_ids(output_file, "id", expected_ids)
data_frame = pd.read_parquet(output_file, engine="pyarrow")
## No index name.
assert data_frame.index.name is None
## Data fields are the same.
npt.assert_array_equal(
data_frame.columns,
["id", "ra", "dec", "ra_error", "dec_error", "Norder", "Dir", "Npix"],
)


def test_reduce_bad_expectation(parquet_shards_dir, tmp_path):
Expand Down
26 changes: 14 additions & 12 deletions tests/hipscat_import/catalog/test_resume_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,20 @@
import numpy.testing as npt
import pytest

from hipscat_import.catalog.resume_files import (clean_resume_files,
is_mapping_done,
is_reducing_done,
read_histogram,
read_mapping_keys,
read_reducing_keys,
set_mapping_done,
set_reducing_done,
write_histogram,
write_mapping_done_key,
write_mapping_start_key,
write_reducing_key)
from hipscat_import.catalog.resume_files import (
clean_resume_files,
is_mapping_done,
is_reducing_done,
read_histogram,
read_mapping_keys,
read_reducing_keys,
set_mapping_done,
set_reducing_done,
write_histogram,
write_mapping_done_key,
write_mapping_start_key,
write_reducing_key,
)


def test_mapping_done(tmp_path):
Expand Down
Loading

0 comments on commit cd602c5

Please sign in to comment.