Skip to content

Commit

Permalink
Return the total number of rows written in metadata. (#306)
Browse files Browse the repository at this point in the history
* Return the total number of rows written in metadata.

* Propogate returned value.
  • Loading branch information
delucchi-cmu authored Jul 19, 2024
1 parent b02ac54 commit 96fa40b
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ def write_to_metadata_files(self, catalog_path: FilePointer = None, storage_opti
catalog_path (FilePointer): base path for the catalog
storage_options (dict): dictionary that contains abstract filesystem credentials
Returns:
sum of the number of rows in the dataset.
Raises:
ValueError: if no path is provided, and could not be inferred.
"""
Expand All @@ -101,7 +104,7 @@ def write_to_metadata_files(self, catalog_path: FilePointer = None, storage_opti
for primary_pixel, join_pixels in self.primary_to_join_map().items()
]

write_parquet_metadata_for_batches(batches, catalog_path, storage_options)
return write_parquet_metadata_for_batches(batches, catalog_path, storage_options)

def write_to_csv(self, catalog_path: FilePointer = None, storage_options: dict = None):
"""Write all partition data to CSV files.
Expand Down
5 changes: 4 additions & 1 deletion src/hipscat/catalog/partition_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ def write_to_metadata_files(self, catalog_path: FilePointer = None, storage_opti
catalog_path (FilePointer): base path for the catalog
storage_options (dict): dictionary that contains abstract filesystem credentials
Returns:
sum of the number of rows in the dataset.
Raises:
ValueError: if no path is provided, and could not be inferred.
"""
Expand All @@ -109,7 +112,7 @@ def write_to_metadata_files(self, catalog_path: FilePointer = None, storage_opti
for pixel in self.get_healpix_pixels()
]

write_parquet_metadata_for_batches(batches, catalog_path, storage_options)
return write_parquet_metadata_for_batches(batches, catalog_path, storage_options)

@classmethod
def read_from_dir(cls, catalog_base_dir: FilePointer, storage_options: dict = None) -> PartitionInfo:
Expand Down
11 changes: 10 additions & 1 deletion src/hipscat/io/parquet_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ def write_parquet_metadata(
storage_options: dictionary that contains abstract filesystem credentials
output_path (str): base path for writing out metadata files
defaults to `catalog_path` if unspecified
Returns:
sum of the number of rows in the dataset.
"""
ignore_prefixes = [
"intermediate",
Expand All @@ -104,6 +107,7 @@ def write_parquet_metadata(
metadata_collector = []
# Collect the healpix pixels so we can sort before writing.
healpix_pixels = []
total_rows = 0

for hips_file in dataset.files:
hips_file_pointer = file_io.get_file_pointer_from_path(hips_file, include_protocol=catalog_path)
Expand All @@ -120,6 +124,7 @@ def write_parquet_metadata(

healpix_pixels.append(healpix_pixel)
metadata_collector.append(single_metadata)
total_rows += single_metadata.num_rows

## Write out the two metadata files
if output_path is None:
Expand All @@ -141,6 +146,7 @@ def write_parquet_metadata(
file_io.write_parquet_metadata(
dataset.schema, common_metadata_file_pointer, storage_options=storage_options
)
return total_rows


def write_parquet_metadata_for_batches(
Expand All @@ -156,13 +162,16 @@ def write_parquet_metadata_for_batches(
output_path (str): base path for writing out metadata files
defaults to `catalog_path` if unspecified
storage_options: dictionary that contains abstract filesystem credentials
Returns:
sum of the number of rows in the dataset.
"""

with tempfile.TemporaryDirectory() as temp_pq_file:
for batch_list in batches:
temp_info_table = pa.Table.from_batches(batch_list)
pq.write_to_dataset(temp_info_table, temp_pq_file)
write_parquet_metadata(temp_pq_file, storage_options=storage_options, output_path=output_path)
return write_parquet_metadata(temp_pq_file, storage_options=storage_options, output_path=output_path)


def read_row_group_fragments(metadata_file: str, storage_options: dict = None):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ def test_primary_to_join_map(association_catalog_join_pixels):
def test_metadata_file_round_trip(association_catalog_join_pixels, tmp_path):
info = PartitionJoinInfo(association_catalog_join_pixels)
pd.testing.assert_frame_equal(info.data_frame, association_catalog_join_pixels)
info.write_to_metadata_files(tmp_path)
total_rows = info.write_to_metadata_files(tmp_path)
assert total_rows == 4

file_pointer = file_io.get_file_pointer_from_path(tmp_path / "_metadata")
new_info = PartitionJoinInfo.read_from_file(file_pointer)
Expand Down Expand Up @@ -138,5 +139,8 @@ def test_load_partition_info_from_dir_and_write(tmp_path, association_catalog_jo
## Can write out the _metadata file by providing:
## - no arguments
## - new catalog directory
info.write_to_metadata_files()
info.write_to_metadata_files(catalog_path=tmp_path)
total_rows = info.write_to_metadata_files()
assert total_rows == 4

total_rows = info.write_to_metadata_files(catalog_path=tmp_path)
assert total_rows == 4
9 changes: 6 additions & 3 deletions tests/hipscat/catalog/test_partition_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ def test_write_to_file_sorted(tmp_path, pixel_list_depth_first, pixel_list_bread
even though the original pixel list is in Norder-major sorting (depth-first)."""
partition_info = PartitionInfo.from_healpix(pixel_list_depth_first)
npt.assert_array_equal(pixel_list_depth_first, partition_info.get_healpix_pixels())
partition_info.write_to_metadata_files(tmp_path)
total_rows = partition_info.write_to_metadata_files(tmp_path)
assert total_rows == 9

partition_info_pointer = paths.get_parquet_metadata_pointer(tmp_path)
new_partition_info = PartitionInfo.read_from_file(partition_info_pointer)
Expand Down Expand Up @@ -155,5 +156,7 @@ def test_load_partition_info_from_dir_and_write(tmp_path, pixel_list_depth_first
## Can write out the _metadata file by providing:
## - no arguments
## - new catalog directory
info.write_to_metadata_files()
info.write_to_metadata_files(catalog_path=tmp_path)
total_rows = info.write_to_metadata_files()
assert total_rows == 9
total_rows = info.write_to_metadata_files(catalog_path=tmp_path)
assert total_rows == 9
16 changes: 11 additions & 5 deletions tests/hipscat/io/test_parquet_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ def test_write_parquet_metadata(
small_sky_dir,
catalog_base_dir,
)
write_parquet_metadata(catalog_base_dir)
total_rows = write_parquet_metadata(catalog_base_dir)
assert total_rows == 131
check_parquet_schema(catalog_base_dir / "_metadata", basic_catalog_parquet_metadata)
## _common_metadata has 0 row groups
check_parquet_schema(
Expand All @@ -34,7 +35,8 @@ def test_write_parquet_metadata(
0,
)
## Re-write - should still have the same properties.
write_parquet_metadata(catalog_base_dir)
total_rows = write_parquet_metadata(catalog_base_dir)
assert total_rows == 131
check_parquet_schema(catalog_base_dir / "_metadata", basic_catalog_parquet_metadata)
## _common_metadata has 0 row groups
check_parquet_schema(
Expand All @@ -55,7 +57,8 @@ def test_write_parquet_metadata_order1(
temp_path,
)

write_parquet_metadata(temp_path)
total_rows = write_parquet_metadata(temp_path)
assert total_rows == 131
## 4 row groups for 4 partitioned parquet files
check_parquet_schema(
temp_path / "_metadata",
Expand All @@ -81,7 +84,8 @@ def test_write_parquet_metadata_sorted(
temp_path,
)

write_parquet_metadata(temp_path)
total_rows = write_parquet_metadata(temp_path)
assert total_rows == 131
## 4 row groups for 4 partitioned parquet files
check_parquet_schema(
temp_path / "_metadata",
Expand Down Expand Up @@ -112,7 +116,9 @@ def test_write_index_parquet_metadata(tmp_path, check_parquet_schema):
]
)

write_parquet_metadata(temp_path, order_by_healpix=False)
total_rows = write_parquet_metadata(temp_path, order_by_healpix=False)
assert total_rows == 2

check_parquet_schema(tmp_path / "index" / "_metadata", index_catalog_parquet_metadata)
## _common_metadata has 0 row groups
check_parquet_schema(
Expand Down
3 changes: 2 additions & 1 deletion tests/hipscat/io/test_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ def test_is_valid_catalog(tmp_path, small_sky_catalog, small_sky_pixels):

# The catalog is valid if both the catalog_info and _metadata files exist,
# and the catalog_info is in a valid format
PartitionInfo.from_healpix(small_sky_pixels).write_to_metadata_files(catalog_dir_pointer)
total_rows = PartitionInfo.from_healpix(small_sky_pixels).write_to_metadata_files(catalog_dir_pointer)
assert total_rows == 1
assert is_valid_catalog(catalog_dir_pointer)

# A partition_info file alone is also not enough
Expand Down

0 comments on commit 96fa40b

Please sign in to comment.