From 06b8379fd148074e8cc058d3772d7c59f1aadc15 Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi <113376043+delucchi-cmu@users.noreply.github.com> Date: Thu, 16 May 2024 11:13:21 -0700 Subject: [PATCH] User arguments to retain intermediate files (#311) --- src/hipscat_import/catalog/arguments.py | 8 ++ src/hipscat_import/catalog/resume_plan.py | 13 ++- src/hipscat_import/catalog/run_import.py | 4 +- .../catalog/test_run_round_trip.py | 79 +++++++++++++++++++ 4 files changed, 99 insertions(+), 5 deletions(-) diff --git a/src/hipscat_import/catalog/arguments.py b/src/hipscat_import/catalog/arguments.py index 7243b5a6..c2a0cfd3 100644 --- a/src/hipscat_import/catalog/arguments.py +++ b/src/hipscat_import/catalog/arguments.py @@ -68,6 +68,13 @@ class ImportArguments(RuntimeArguments): """healpix order to use when mapping. will be ``highest_healpix_order`` unless a positive value is provided for ``constant_healpix_order``""" + delete_intermediate_parquet_files: bool = True + """should we delete the smaller intermediate parquet files generated in the + splitting stage, once the relevant reducing stage is complete?""" + delete_resume_log_files: bool = True + """should we delete task-level done files once each stage is complete? + if False, we will keep all sub-histograms from the mapping stage, and all + done marker files at the end of the pipeline.""" debug_stats_only: bool = False """do not perform a map reduce and don't create a new catalog. generate the partition info""" @@ -125,6 +132,7 @@ def _check_arguments(self): progress_bar=self.progress_bar, input_paths=self.input_paths, tmp_path=self.resume_tmp, + delete_resume_log_files=self.delete_resume_log_files, ) def to_catalog_info(self, total_rows) -> CatalogInfo: diff --git a/src/hipscat_import/catalog/resume_plan.py b/src/hipscat_import/catalog/resume_plan.py index 1d7280de..2585d5cf 100644 --- a/src/hipscat_import/catalog/resume_plan.py +++ b/src/hipscat_import/catalog/resume_plan.py @@ -26,6 +26,10 @@ class ResumePlan(PipelineResumePlan): """set of files (and job keys) that have yet to be split""" destination_pixel_map: Optional[List[Tuple[HealpixPixel, List[HealpixPixel], str]]] = None """Fully resolved map of destination pixels to constituent smaller pixels""" + delete_resume_log_files: bool = True + """should we delete task-level done files once each stage is complete? + if False, we will keep all sub-histograms from the mapping stage, and all + done marker files at the end of the pipeline.""" MAPPING_STAGE = "mapping" SPLITTING_STAGE = "splitting" @@ -119,10 +123,11 @@ def read_histogram(self, healpix_order): aggregate_histogram.add(SparseHistogram.from_file(partial_file_name)) aggregate_histogram.to_file(file_name) - file_io.remove_directory( - file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAMS_DIR), - ignore_errors=True, - ) + if self.delete_resume_log_files: + file_io.remove_directory( + file_io.append_paths_to_pointer(self.tmp_path, self.HISTOGRAMS_DIR), + ignore_errors=True, + ) full_histogram = SparseHistogram.from_file(file_name).to_array() diff --git a/src/hipscat_import/catalog/run_import.py b/src/hipscat_import/catalog/run_import.py index bfe60bde..3b9ebe7c 100644 --- a/src/hipscat_import/catalog/run_import.py +++ b/src/hipscat_import/catalog/run_import.py @@ -98,6 +98,7 @@ def _reduce_pixels(args, destination_pixel_map, client): add_hipscat_index=args.add_hipscat_index, use_schema_file=args.use_schema_file, use_hipscat_index=args.use_hipscat_index, + delete_input_files=args.delete_intermediate_parquet_files, storage_options=args.output_storage_options, ) ) @@ -182,5 +183,6 @@ def run(args, client): step_progress.update(1) io.write_fits_map(args.catalog_path, raw_histogram, storage_options=args.output_storage_options) step_progress.update(1) - args.resume_plan.clean_resume_files() + if args.resume_plan.delete_resume_log_files: + args.resume_plan.clean_resume_files() step_progress.update(1) diff --git a/tests/hipscat_import/catalog/test_run_round_trip.py b/tests/hipscat_import/catalog/test_run_round_trip.py index f61488de..11e8aaf7 100644 --- a/tests/hipscat_import/catalog/test_run_round_trip.py +++ b/tests/hipscat_import/catalog/test_run_round_trip.py @@ -244,6 +244,85 @@ def test_import_constant_healpix_order( assert np.logical_and(ids >= 700, ids < 832).all() +def assert_directory_contains(dir_name, expected_contents): + assert os.path.exists(dir_name) + actual_contents = os.listdir(dir_name) + actual_contents.sort() + npt.assert_array_equal(actual_contents, expected_contents) + + +@pytest.mark.dask +def test_import_keep_intermediate_files( + dask_client, + small_sky_parts_dir, + tmp_path, +): + """Test that ALL intermediate files are still around on-disk after + successful import, when setting the appropriate flags. + """ + temp = os.path.join(tmp_path, "intermediate_files") + os.makedirs(temp) + args = ImportArguments( + output_artifact_name="small_sky_object_catalog", + input_path=small_sky_parts_dir, + file_reader="csv", + output_path=tmp_path, + tmp_dir=temp, + dask_tmp=temp, + progress_bar=False, + delete_intermediate_parquet_files=False, + delete_resume_log_files=False, + ) + + runner.run(args, dask_client) + + # Check that the catalog metadata file exists + catalog = Catalog.read_from_hipscat(args.catalog_path) + assert catalog.on_disk + assert catalog.catalog_path == args.catalog_path + + ## Check that stage-level done files are still around. + base_intermediate_dir = os.path.join(temp, "small_sky_object_catalog", "intermediate") + expected_contents = [ + "histograms", # directory containing sub-histograms + "input_paths.txt", # original input paths for subsequent comparison + "mapping_done", # stage-level done file + "mapping_histogram.npz", # concatenated histogram file + "order_0", # all intermediate parquet files + "reducing", # directory containing task-level done files + "reducing_done", # stage-level done file + "splitting", # directory containing task-level done files + "splitting_done", # stage-level done file + ] + assert_directory_contains(base_intermediate_dir, expected_contents) + + checking_dir = os.path.join(base_intermediate_dir, "histograms") + assert_directory_contains( + checking_dir, ["map_0.npz", "map_1.npz", "map_2.npz", "map_3.npz", "map_4.npz", "map_5.npz"] + ) + checking_dir = os.path.join(base_intermediate_dir, "splitting") + assert_directory_contains( + checking_dir, + ["split_0_done", "split_1_done", "split_2_done", "split_3_done", "split_4_done", "split_5_done"], + ) + + checking_dir = os.path.join(base_intermediate_dir, "reducing") + assert_directory_contains(checking_dir, ["0_11_done"]) + + # Check that all of the intermediate parquet shards are still around. + checking_dir = os.path.join(base_intermediate_dir, "order_0", "dir_0", "pixel_11") + assert_directory_contains( + checking_dir, + [ + "shard_split_0_0.parquet", + "shard_split_1_0.parquet", + "shard_split_2_0.parquet", + "shard_split_3_0.parquet", + "shard_split_4_0.parquet", + ], + ) + + @pytest.mark.dask def test_import_lowest_healpix_order( dask_client,