Skip to content

Commit

Permalink
User arguments to retain intermediate files (#311)
Browse files Browse the repository at this point in the history
  • Loading branch information
delucchi-cmu authored May 16, 2024
1 parent 94c069b commit 06b8379
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 5 deletions.
8 changes: 8 additions & 0 deletions src/hipscat_import/catalog/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ class ImportArguments(RuntimeArguments):
"""healpix order to use when mapping. will be
``highest_healpix_order`` unless a positive value is provided for
``constant_healpix_order``"""
delete_intermediate_parquet_files: bool = True
"""should we delete the smaller intermediate parquet files generated in the
splitting stage, once the relevant reducing stage is complete?"""
delete_resume_log_files: bool = True
"""should we delete task-level done files once each stage is complete?
if False, we will keep all sub-histograms from the mapping stage, and all
done marker files at the end of the pipeline."""
debug_stats_only: bool = False
"""do not perform a map reduce and don't create a new
catalog. generate the partition info"""
Expand Down Expand Up @@ -125,6 +132,7 @@ def _check_arguments(self):
progress_bar=self.progress_bar,
input_paths=self.input_paths,
tmp_path=self.resume_tmp,
delete_resume_log_files=self.delete_resume_log_files,
)

def to_catalog_info(self, total_rows) -> CatalogInfo:
Expand Down
13 changes: 9 additions & 4 deletions src/hipscat_import/catalog/resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ class ResumePlan(PipelineResumePlan):
"""set of files (and job keys) that have yet to be split"""
destination_pixel_map: Optional[List[Tuple[HealpixPixel, List[HealpixPixel], str]]] = None
"""Fully resolved map of destination pixels to constituent smaller pixels"""
delete_resume_log_files: bool = True
"""should we delete task-level done files once each stage is complete?
if False, we will keep all sub-histograms from the mapping stage, and all
done marker files at the end of the pipeline."""

MAPPING_STAGE = "mapping"
SPLITTING_STAGE = "splitting"
Expand Down Expand Up @@ -119,10 +123,11 @@ def read_histogram(self, healpix_order):
aggregate_histogram.add(SparseHistogram.from_file(partial_file_name))

aggregate_histogram.to_file(file_name)
file_io.remove_directory(
file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAMS_DIR),
ignore_errors=True,
)
if self.delete_resume_log_files:
file_io.remove_directory(
file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAMS_DIR),
ignore_errors=True,
)

full_histogram = SparseHistogram.from_file(file_name).to_array()

Expand Down
4 changes: 3 additions & 1 deletion src/hipscat_import/catalog/run_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def _reduce_pixels(args, destination_pixel_map, client):
add_hipscat_index=args.add_hipscat_index,
use_schema_file=args.use_schema_file,
use_hipscat_index=args.use_hipscat_index,
delete_input_files=args.delete_intermediate_parquet_files,
storage_options=args.output_storage_options,
)
)
Expand Down Expand Up @@ -182,5 +183,6 @@ def run(args, client):
step_progress.update(1)
io.write_fits_map(args.catalog_path, raw_histogram, storage_options=args.output_storage_options)
step_progress.update(1)
args.resume_plan.clean_resume_files()
if args.resume_plan.delete_resume_log_files:
args.resume_plan.clean_resume_files()
step_progress.update(1)
79 changes: 79 additions & 0 deletions tests/hipscat_import/catalog/test_run_round_trip.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,85 @@ def test_import_constant_healpix_order(
assert np.logical_and(ids >= 700, ids < 832).all()


def assert_directory_contains(dir_name, expected_contents):
assert os.path.exists(dir_name)
actual_contents = os.listdir(dir_name)
actual_contents.sort()
npt.assert_array_equal(actual_contents, expected_contents)


@pytest.mark.dask
def test_import_keep_intermediate_files(
dask_client,
small_sky_parts_dir,
tmp_path,
):
"""Test that ALL intermediate files are still around on-disk after
successful import, when setting the appropriate flags.
"""
temp = os.path.join(tmp_path, "intermediate_files")
os.makedirs(temp)
args = ImportArguments(
output_artifact_name="small_sky_object_catalog",
input_path=small_sky_parts_dir,
file_reader="csv",
output_path=tmp_path,
tmp_dir=temp,
dask_tmp=temp,
progress_bar=False,
delete_intermediate_parquet_files=False,
delete_resume_log_files=False,
)

runner.run(args, dask_client)

# Check that the catalog metadata file exists
catalog = Catalog.read_from_hipscat(args.catalog_path)
assert catalog.on_disk
assert catalog.catalog_path == args.catalog_path

## Check that stage-level done files are still around.
base_intermediate_dir = os.path.join(temp, "small_sky_object_catalog", "intermediate")
expected_contents = [
"histograms", # directory containing sub-histograms
"input_paths.txt", # original input paths for subsequent comparison
"mapping_done", # stage-level done file
"mapping_histogram.npz", # concatenated histogram file
"order_0", # all intermediate parquet files
"reducing", # directory containing task-level done files
"reducing_done", # stage-level done file
"splitting", # directory containing task-level done files
"splitting_done", # stage-level done file
]
assert_directory_contains(base_intermediate_dir, expected_contents)

checking_dir = os.path.join(base_intermediate_dir, "histograms")
assert_directory_contains(
checking_dir, ["map_0.npz", "map_1.npz", "map_2.npz", "map_3.npz", "map_4.npz", "map_5.npz"]
)
checking_dir = os.path.join(base_intermediate_dir, "splitting")
assert_directory_contains(
checking_dir,
["split_0_done", "split_1_done", "split_2_done", "split_3_done", "split_4_done", "split_5_done"],
)

checking_dir = os.path.join(base_intermediate_dir, "reducing")
assert_directory_contains(checking_dir, ["0_11_done"])

# Check that all of the intermediate parquet shards are still around.
checking_dir = os.path.join(base_intermediate_dir, "order_0", "dir_0", "pixel_11")
assert_directory_contains(
checking_dir,
[
"shard_split_0_0.parquet",
"shard_split_1_0.parquet",
"shard_split_2_0.parquet",
"shard_split_3_0.parquet",
"shard_split_4_0.parquet",
],
)


@pytest.mark.dask
def test_import_lowest_healpix_order(
dask_client,
Expand Down

0 comments on commit 06b8379

Please sign in to comment.