From 26596c933d845efdd99cffbc1fef59f5e7824e87 Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi Date: Wed, 29 May 2024 16:37:05 -0400 Subject: [PATCH] Tidy up some resume functionality --- src/hipscat_import/catalog/resume_plan.py | 4 -- src/hipscat_import/catalog/run_import.py | 3 +- .../margin_cache/margin_cache.py | 6 +-- .../margin_cache/margin_cache_arguments.py | 6 +++ .../margin_cache/margin_cache_map_reduce.py | 11 ++-- .../margin_cache/margin_cache_resume_plan.py | 8 +-- src/hipscat_import/pipeline_resume_plan.py | 7 ++- src/hipscat_import/soap/arguments.py | 6 +++ src/hipscat_import/soap/resume_plan.py | 7 ++- src/hipscat_import/soap/run_soap.py | 1 + .../test_margin_cache_map_reduce.py | 20 ++++++++ .../test_margin_cache_resume_plan.py | 50 ++++++++----------- 12 files changed, 82 insertions(+), 47 deletions(-) diff --git a/src/hipscat_import/catalog/resume_plan.py b/src/hipscat_import/catalog/resume_plan.py index fe301ec4..604d8a91 100644 --- a/src/hipscat_import/catalog/resume_plan.py +++ b/src/hipscat_import/catalog/resume_plan.py @@ -26,10 +26,6 @@ class ResumePlan(PipelineResumePlan): """set of files (and job keys) that have yet to be split""" destination_pixel_map: Optional[List[Tuple[HealpixPixel, List[HealpixPixel], str]]] = None """Fully resolved map of destination pixels to constituent smaller pixels""" - delete_resume_log_files: bool = True - """should we delete task-level done files once each stage is complete? - if False, we will keep all sub-histograms from the mapping stage, and all - done marker files at the end of the pipeline.""" MAPPING_STAGE = "mapping" SPLITTING_STAGE = "splitting" diff --git a/src/hipscat_import/catalog/run_import.py b/src/hipscat_import/catalog/run_import.py index 3b9ebe7c..1b97b547 100644 --- a/src/hipscat_import/catalog/run_import.py +++ b/src/hipscat_import/catalog/run_import.py @@ -183,6 +183,5 @@ def run(args, client): step_progress.update(1) io.write_fits_map(args.catalog_path, raw_histogram, storage_options=args.output_storage_options) step_progress.update(1) - if args.resume_plan.delete_resume_log_files: - args.resume_plan.clean_resume_files() + args.resume_plan.clean_resume_files() step_progress.update(1) diff --git a/src/hipscat_import/margin_cache/margin_cache.py b/src/hipscat_import/margin_cache/margin_cache.py index 9cb99045..95d46ecc 100644 --- a/src/hipscat_import/margin_cache/margin_cache.py +++ b/src/hipscat_import/margin_cache/margin_cache.py @@ -16,10 +16,7 @@ def generate_margin_cache(args, client): args (MarginCacheArguments): A valid `MarginCacheArguments` object. client (dask.distributed.Client): A dask distributed client object. """ - partition_pixels = args.catalog.partition_info.get_healpix_pixels() - negative_pixels = args.catalog.generate_negative_tree_pixels() - combined_pixels = partition_pixels + negative_pixels - resume_plan = MarginCachePlan(args, combined_pixels=combined_pixels, partition_pixels=partition_pixels) + resume_plan = MarginCachePlan(args) if not resume_plan.is_mapping_done(): futures = [] @@ -54,6 +51,7 @@ def generate_margin_cache(args, client): partition_order=pix.order, partition_pixel=pix.pixel, original_catalog_metadata=paths.get_common_metadata_pointer(args.input_catalog_path), + delete_intermediate_parquet_files=args.delete_intermediate_parquet_files, input_storage_options=args.input_storage_options, ) ) diff --git a/src/hipscat_import/margin_cache/margin_cache_arguments.py b/src/hipscat_import/margin_cache/margin_cache_arguments.py index 12b73bf2..4f30ae6e 100644 --- a/src/hipscat_import/margin_cache/margin_cache_arguments.py +++ b/src/hipscat_import/margin_cache/margin_cache_arguments.py @@ -24,6 +24,12 @@ class MarginCacheArguments(RuntimeArguments): order of healpix partitioning in the source catalog. if `margin_order` is left default or set to -1, then the `margin_order` will be set dynamically to the highest partition order plus 1.""" + delete_intermediate_parquet_files: bool = True + """should we delete the smaller intermediate parquet files generated in the + splitting stage, once the relevant reducing stage is complete?""" + delete_resume_log_files: bool = True + """should we delete task-level done files once each stage is complete? + if False, we will keep all done marker files at the end of the pipeline.""" input_catalog_path: str = "" """the path to the hipscat-formatted input catalog.""" diff --git a/src/hipscat_import/margin_cache/margin_cache_map_reduce.py b/src/hipscat_import/margin_cache/margin_cache_map_reduce.py index f996c04c..629e27d9 100644 --- a/src/hipscat_import/margin_cache/margin_cache_map_reduce.py +++ b/src/hipscat_import/margin_cache/margin_cache_map_reduce.py @@ -118,6 +118,7 @@ def reduce_margin_shards( partition_order, partition_pixel, original_catalog_metadata, + delete_intermediate_parquet_files, input_storage_options, ): """Reduce all partition pixel directories into a single file""" @@ -128,8 +129,6 @@ def reduce_margin_shards( if file_io.does_file_or_directory_exist(shard_dir): data = ds.dataset(shard_dir, format="parquet") full_df = data.to_table().to_pandas() - margin_cache_dir = paths.pixel_directory(output_path, partition_order, partition_pixel) - file_io.make_directory(margin_cache_dir, exist_ok=True, storage_options=output_storage_options) if len(full_df): schema = file_io.read_parquet_metadata( @@ -142,6 +141,11 @@ def reduce_margin_shards( .append(pa.field("margin_Npix", pa.uint64())) ) + margin_cache_dir = paths.pixel_directory(output_path, partition_order, partition_pixel) + file_io.make_directory( + margin_cache_dir, exist_ok=True, storage_options=output_storage_options + ) + margin_cache_file_path = paths.pixel_catalog_file( output_path, partition_order, partition_pixel ) @@ -149,7 +153,8 @@ def reduce_margin_shards( full_df.to_parquet( margin_cache_file_path, schema=schema, storage_options=output_storage_options ) - file_io.remove_directory(shard_dir) + if delete_intermediate_parquet_files: + file_io.remove_directory(shard_dir) MarginCachePlan.reducing_key_done(intermediate_directory, reducing_key) except Exception as exception: # pylint: disable=broad-exception-caught diff --git a/src/hipscat_import/margin_cache/margin_cache_resume_plan.py b/src/hipscat_import/margin_cache/margin_cache_resume_plan.py index 3fe58aeb..72aec80d 100644 --- a/src/hipscat_import/margin_cache/margin_cache_resume_plan.py +++ b/src/hipscat_import/margin_cache/margin_cache_resume_plan.py @@ -27,16 +27,15 @@ class MarginCachePlan(PipelineResumePlan): REDUCING_STAGE = "reducing" MARGIN_PAIR_FILE = "margin_pair.csv" - def __init__(self, args: MarginCacheArguments, combined_pixels, partition_pixels): + def __init__(self, args: MarginCacheArguments): if not args.tmp_path: # pragma: no cover (not reachable, but required for mypy) raise ValueError("tmp_path is required") super().__init__( resume=args.resume, progress_bar=args.progress_bar, tmp_path=args.tmp_path, + delete_resume_log_files=args.delete_resume_log_files, ) - self.combined_pixels = combined_pixels - self.partition_pixels = partition_pixels self._gather_plan(args) def _gather_plan(self, args): @@ -52,6 +51,9 @@ def _gather_plan(self, args): raise ValueError("mapping must be complete before reducing") step_progress.update(1) + self.partition_pixels = args.catalog.partition_info.get_healpix_pixels() + negative_pixels = args.catalog.generate_negative_tree_pixels() + self.combined_pixels = self.partition_pixels + negative_pixels self.margin_pair_file = file_io.append_paths_to_pointer(self.tmp_path, self.MARGIN_PAIR_FILE) if not file_io.does_file_or_directory_exist(self.margin_pair_file): margin_pairs = _find_partition_margin_pixel_pairs(self.combined_pixels, args.margin_order) diff --git a/src/hipscat_import/pipeline_resume_plan.py b/src/hipscat_import/pipeline_resume_plan.py index f9ef4f60..7e859e91 100644 --- a/src/hipscat_import/pipeline_resume_plan.py +++ b/src/hipscat_import/pipeline_resume_plan.py @@ -25,6 +25,10 @@ class PipelineResumePlan: progress_bar: bool = True """if true, a tqdm progress bar will be displayed for user feedback of planning progress""" + delete_resume_log_files: bool = True + """should we delete task-level done files once each stage is complete? + if False, we will keep all sub-histograms from the mapping stage, and all + done marker files at the end of the pipeline.""" ORIGINAL_INPUT_PATHS = "input_paths.txt" @@ -109,7 +113,8 @@ def get_keys_from_file_names(directory, extension): def clean_resume_files(self): """Remove all intermediate files created in execution.""" - file_io.remove_directory(self.tmp_path, ignore_errors=True) + if self.delete_resume_log_files: + file_io.remove_directory(self.tmp_path, ignore_errors=True) def wait_for_futures(self, futures, stage_name, fail_fast=False): """Wait for collected futures to complete. diff --git a/src/hipscat_import/soap/arguments.py b/src/hipscat_import/soap/arguments.py index 71e4b93d..571b6371 100644 --- a/src/hipscat_import/soap/arguments.py +++ b/src/hipscat_import/soap/arguments.py @@ -28,9 +28,15 @@ class SoapArguments(RuntimeArguments): resume: bool = True """if there are existing intermediate resume files, should we read those and continue to run the pipeline where we left off""" + delete_resume_log_files: bool = True + """should we delete task-level done files once each stage is complete? + if False, we will keep all done marker files at the end of the pipeline.""" write_leaf_files: bool = False """Should we also write out leaf parquet files (e.g. Norder/Dir/Npix.parquet) that represent the full association table""" + delete_intermediate_parquet_files: bool = True + """should we delete the smaller intermediate parquet files generated in the + mapping stage, once the relevant reducing stage is complete?""" compute_partition_size: int = 1_000_000_000 diff --git a/src/hipscat_import/soap/resume_plan.py b/src/hipscat_import/soap/resume_plan.py index 11d1384d..be1a6f86 100644 --- a/src/hipscat_import/soap/resume_plan.py +++ b/src/hipscat_import/soap/resume_plan.py @@ -36,7 +36,12 @@ class SoapPlan(PipelineResumePlan): def __init__(self, args: SoapArguments): if not args.tmp_path: # pragma: no cover (not reachable, but required for mypy) raise ValueError("tmp_path is required") - super().__init__(resume=args.resume, progress_bar=args.progress_bar, tmp_path=args.tmp_path) + super().__init__( + resume=args.resume, + progress_bar=args.progress_bar, + tmp_path=args.tmp_path, + delete_resume_log_files=args.delete_resume_log_files, + ) self.gather_plan(args) def gather_plan(self, args): diff --git a/src/hipscat_import/soap/run_soap.py b/src/hipscat_import/soap/run_soap.py index 6d4fad71..dafafae4 100644 --- a/src/hipscat_import/soap/run_soap.py +++ b/src/hipscat_import/soap/run_soap.py @@ -43,6 +43,7 @@ def run(args, client): soap_args=args, object_pixel=object_pixel, object_key=object_key, + delete_input_files=args.delete_intermediate_parquet_files, ) ) diff --git a/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py b/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py index a9d40ee9..41fc2177 100644 --- a/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py +++ b/tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py @@ -144,6 +144,25 @@ def test_reduce_margin_shards(tmp_path): 1, 21, original_catalog_metadata=schema_path, + delete_intermediate_parquet_files=False, + input_storage_options=None, + ) + + result_path = paths.pixel_catalog_file(tmp_path, 1, 21) + + validate_result_dataframe(result_path, 720) + assert os.path.exists(shard_dir) + + # Run again with delete_intermediate_parquet_files. shard_dir doesn't exist at the end. + margin_cache_map_reduce.reduce_margin_shards( + intermediate_dir, + "1_21", + tmp_path, + None, + 1, + 21, + original_catalog_metadata=schema_path, + delete_intermediate_parquet_files=True, input_storage_options=None, ) @@ -177,6 +196,7 @@ def test_reduce_margin_shards_error(tmp_path, basic_data_shard_df, capsys): 1, 21, original_catalog_metadata=schema_path, + delete_intermediate_parquet_files=True, input_storage_options=None, ) diff --git a/tests/hipscat_import/margin_cache/test_margin_cache_resume_plan.py b/tests/hipscat_import/margin_cache/test_margin_cache_resume_plan.py index 00cb97c2..baeffba1 100644 --- a/tests/hipscat_import/margin_cache/test_margin_cache_resume_plan.py +++ b/tests/hipscat_import/margin_cache/test_margin_cache_resume_plan.py @@ -1,7 +1,7 @@ import numpy as np import numpy.testing as npt import pytest -from hipscat.pixel_math.healpix_pixel import HealpixPixel +from hipscat.catalog import Catalog from hipscat_import.margin_cache.margin_cache_arguments import MarginCacheArguments from hipscat_import.margin_cache.margin_cache_resume_plan import ( @@ -13,10 +13,10 @@ @pytest.fixture -def small_sky_margin_args(tmp_path, small_sky_source_catalog): +def small_sky_margin_args(tmp_path, small_sky_object_catalog): return MarginCacheArguments( margin_threshold=5.0, - input_catalog_path=small_sky_source_catalog, + input_catalog_path=small_sky_object_catalog, output_path=tmp_path, output_artifact_name="catalog_cache", progress_bar=False, @@ -28,8 +28,7 @@ def test_done_checks(small_sky_margin_args): """Verify that done files imply correct pipeline execution order: mapping > reducing """ - pixels = [HealpixPixel(0, 11)] - plan = MarginCachePlan(small_sky_margin_args, pixels, pixels) + plan = MarginCachePlan(small_sky_margin_args) plan.touch_stage_done_file(MarginCachePlan.REDUCING_STAGE) with pytest.raises(ValueError, match="before reducing"): @@ -42,7 +41,7 @@ def test_done_checks(small_sky_margin_args): plan.clean_resume_files() - plan = MarginCachePlan(small_sky_margin_args, pixels, pixels) + plan = MarginCachePlan(small_sky_margin_args) plan.touch_stage_done_file(MarginCachePlan.MAPPING_STAGE) plan._gather_plan(small_sky_margin_args) @@ -58,24 +57,16 @@ def never_fails(): @pytest.mark.dask def test_some_map_task_failures(small_sky_margin_args, dask_client): """Test that we only consider map stage successful if all done files are written""" - pixels = [HealpixPixel(0, 10), HealpixPixel(0, 11)] - plan = MarginCachePlan(small_sky_margin_args, pixels, pixels) + plan = MarginCachePlan(small_sky_margin_args) ## Method doesn't FAIL, but it doesn't write out the done file either. ## Since the intermediate files aren't found, we throw an error. futures = [dask_client.submit(never_fails)] - with pytest.raises(RuntimeError, match="2 mapping stages"): + with pytest.raises(RuntimeError, match="1 mapping stages"): plan.wait_for_mapping(futures) MarginCachePlan.touch_key_done_file(plan.tmp_path, MarginCachePlan.MAPPING_STAGE, "0_11") - ## Method succeeds, but only *ONE* done file is present. - futures = [dask_client.submit(never_fails)] - with pytest.raises(RuntimeError, match="1 mapping stage"): - plan.wait_for_mapping(futures) - - MarginCachePlan.touch_key_done_file(plan.tmp_path, MarginCachePlan.MAPPING_STAGE, "0_10") - ## Method succeeds, *and* done files are present. futures = [dask_client.submit(never_fails)] plan.wait_for_mapping(futures) @@ -84,34 +75,33 @@ def test_some_map_task_failures(small_sky_margin_args, dask_client): @pytest.mark.dask def test_some_reducing_task_failures(small_sky_margin_args, dask_client): """Test that we only consider reduce stage successful if all done files are written""" - pixels = [HealpixPixel(0, 10), HealpixPixel(0, 11)] - plan = MarginCachePlan(small_sky_margin_args, pixels, pixels) + plan = MarginCachePlan(small_sky_margin_args) ## Method doesn't FAIL, but it doesn't write out the done file either. ## Since the intermediate files aren't found, we throw an error. futures = [dask_client.submit(never_fails)] - with pytest.raises(RuntimeError, match="2 reducing stages"): + with pytest.raises(RuntimeError, match="12 reducing stages"): plan.wait_for_reducing(futures) MarginCachePlan.touch_key_done_file(plan.tmp_path, MarginCachePlan.REDUCING_STAGE, "0_11") ## Method succeeds, but only *ONE* done file is present. futures = [dask_client.submit(never_fails)] - with pytest.raises(RuntimeError, match="1 reducing stage"): + with pytest.raises(RuntimeError, match="11 reducing stages"): plan.wait_for_reducing(futures) - MarginCachePlan.touch_key_done_file(plan.tmp_path, MarginCachePlan.REDUCING_STAGE, "0_10") + for partition in range(0, 12): + MarginCachePlan.touch_key_done_file(plan.tmp_path, MarginCachePlan.REDUCING_STAGE, f"0_{partition}") ## Method succeeds, *and* done files are present. futures = [dask_client.submit(never_fails)] plan.wait_for_reducing(futures) -def test_partition_margin_pixel_pairs(small_sky_margin_args): +def test_partition_margin_pixel_pairs(small_sky_source_catalog): """Ensure partition_margin_pixel_pairs can generate main partition pixels.""" - margin_pairs = _find_partition_margin_pixel_pairs( - small_sky_margin_args.catalog.partition_info.get_healpix_pixels(), small_sky_margin_args.margin_order - ) + source_catalog = Catalog.read_from_hipscat(small_sky_source_catalog) + margin_pairs = _find_partition_margin_pixel_pairs(source_catalog.get_healpix_pixels(), 3) expected = np.array([725, 733, 757, 765, 727, 735, 759, 767, 469, 192]) @@ -119,13 +109,15 @@ def test_partition_margin_pixel_pairs(small_sky_margin_args): assert len(margin_pairs) == 196 -def test_partition_margin_pixel_pairs_negative(small_sky_margin_args): +def test_partition_margin_pixel_pairs_negative(small_sky_source_catalog): """Ensure partition_margin_pixel_pairs can generate negative tree pixels.""" - partition_stats = small_sky_margin_args.catalog.partition_info.get_healpix_pixels() - negative_pixels = small_sky_margin_args.catalog.generate_negative_tree_pixels() + source_catalog = Catalog.read_from_hipscat(small_sky_source_catalog) + + partition_stats = source_catalog.get_healpix_pixels() + negative_pixels = source_catalog.generate_negative_tree_pixels() combined_pixels = partition_stats + negative_pixels - margin_pairs = _find_partition_margin_pixel_pairs(combined_pixels, small_sky_margin_args.margin_order) + margin_pairs = _find_partition_margin_pixel_pairs(combined_pixels, 3) expected_order = 0 expected_pixel = 10