Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tidy up some resume functionality #324

Merged
merged 1 commit into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions src/hipscat_import/catalog/resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ class ResumePlan(PipelineResumePlan):
"""set of files (and job keys) that have yet to be split"""
destination_pixel_map: Optional[List[Tuple[HealpixPixel, List[HealpixPixel], str]]] = None
"""Fully resolved map of destination pixels to constituent smaller pixels"""
delete_resume_log_files: bool = True
"""should we delete task-level done files once each stage is complete?
if False, we will keep all sub-histograms from the mapping stage, and all
done marker files at the end of the pipeline."""

MAPPING_STAGE = "mapping"
SPLITTING_STAGE = "splitting"
Expand Down
3 changes: 1 addition & 2 deletions src/hipscat_import/catalog/run_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,5 @@ def run(args, client):
step_progress.update(1)
io.write_fits_map(args.catalog_path, raw_histogram, storage_options=args.output_storage_options)
step_progress.update(1)
if args.resume_plan.delete_resume_log_files:
args.resume_plan.clean_resume_files()
args.resume_plan.clean_resume_files()
step_progress.update(1)
6 changes: 2 additions & 4 deletions src/hipscat_import/margin_cache/margin_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ def generate_margin_cache(args, client):
args (MarginCacheArguments): A valid `MarginCacheArguments` object.
client (dask.distributed.Client): A dask distributed client object.
"""
partition_pixels = args.catalog.partition_info.get_healpix_pixels()
negative_pixels = args.catalog.generate_negative_tree_pixels()
combined_pixels = partition_pixels + negative_pixels
resume_plan = MarginCachePlan(args, combined_pixels=combined_pixels, partition_pixels=partition_pixels)
resume_plan = MarginCachePlan(args)

if not resume_plan.is_mapping_done():
futures = []
Expand Down Expand Up @@ -54,6 +51,7 @@ def generate_margin_cache(args, client):
partition_order=pix.order,
partition_pixel=pix.pixel,
original_catalog_metadata=paths.get_common_metadata_pointer(args.input_catalog_path),
delete_intermediate_parquet_files=args.delete_intermediate_parquet_files,
input_storage_options=args.input_storage_options,
)
)
Expand Down
6 changes: 6 additions & 0 deletions src/hipscat_import/margin_cache/margin_cache_arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ class MarginCacheArguments(RuntimeArguments):
order of healpix partitioning in the source catalog. if `margin_order` is left
default or set to -1, then the `margin_order` will be set dynamically to the
highest partition order plus 1."""
delete_intermediate_parquet_files: bool = True
"""should we delete the smaller intermediate parquet files generated in the
splitting stage, once the relevant reducing stage is complete?"""
delete_resume_log_files: bool = True
"""should we delete task-level done files once each stage is complete?
if False, we will keep all done marker files at the end of the pipeline."""

input_catalog_path: str = ""
"""the path to the hipscat-formatted input catalog."""
Expand Down
11 changes: 8 additions & 3 deletions src/hipscat_import/margin_cache/margin_cache_map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def reduce_margin_shards(
partition_order,
partition_pixel,
original_catalog_metadata,
delete_intermediate_parquet_files,
input_storage_options,
):
"""Reduce all partition pixel directories into a single file"""
Expand All @@ -128,8 +129,6 @@ def reduce_margin_shards(
if file_io.does_file_or_directory_exist(shard_dir):
data = ds.dataset(shard_dir, format="parquet")
full_df = data.to_table().to_pandas()
margin_cache_dir = paths.pixel_directory(output_path, partition_order, partition_pixel)
file_io.make_directory(margin_cache_dir, exist_ok=True, storage_options=output_storage_options)

if len(full_df):
schema = file_io.read_parquet_metadata(
Expand All @@ -142,14 +141,20 @@ def reduce_margin_shards(
.append(pa.field("margin_Npix", pa.uint64()))
)

margin_cache_dir = paths.pixel_directory(output_path, partition_order, partition_pixel)
file_io.make_directory(
margin_cache_dir, exist_ok=True, storage_options=output_storage_options
)

margin_cache_file_path = paths.pixel_catalog_file(
output_path, partition_order, partition_pixel
)

full_df.to_parquet(
margin_cache_file_path, schema=schema, storage_options=output_storage_options
)
file_io.remove_directory(shard_dir)
if delete_intermediate_parquet_files:
file_io.remove_directory(shard_dir)

MarginCachePlan.reducing_key_done(intermediate_directory, reducing_key)
except Exception as exception: # pylint: disable=broad-exception-caught
Expand Down
8 changes: 5 additions & 3 deletions src/hipscat_import/margin_cache/margin_cache_resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,15 @@ class MarginCachePlan(PipelineResumePlan):
REDUCING_STAGE = "reducing"
MARGIN_PAIR_FILE = "margin_pair.csv"

def __init__(self, args: MarginCacheArguments, combined_pixels, partition_pixels):
def __init__(self, args: MarginCacheArguments):
if not args.tmp_path: # pragma: no cover (not reachable, but required for mypy)
raise ValueError("tmp_path is required")
super().__init__(
resume=args.resume,
progress_bar=args.progress_bar,
tmp_path=args.tmp_path,
delete_resume_log_files=args.delete_resume_log_files,
)
self.combined_pixels = combined_pixels
self.partition_pixels = partition_pixels
self._gather_plan(args)

def _gather_plan(self, args):
Expand All @@ -52,6 +51,9 @@ def _gather_plan(self, args):
raise ValueError("mapping must be complete before reducing")
step_progress.update(1)

self.partition_pixels = args.catalog.partition_info.get_healpix_pixels()
negative_pixels = args.catalog.generate_negative_tree_pixels()
self.combined_pixels = self.partition_pixels + negative_pixels
self.margin_pair_file = file_io.append_paths_to_pointer(self.tmp_path, self.MARGIN_PAIR_FILE)
if not file_io.does_file_or_directory_exist(self.margin_pair_file):
margin_pairs = _find_partition_margin_pixel_pairs(self.combined_pixels, args.margin_order)
Expand Down
7 changes: 6 additions & 1 deletion src/hipscat_import/pipeline_resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ class PipelineResumePlan:
progress_bar: bool = True
"""if true, a tqdm progress bar will be displayed for user
feedback of planning progress"""
delete_resume_log_files: bool = True
"""should we delete task-level done files once each stage is complete?
if False, we will keep all sub-histograms from the mapping stage, and all
done marker files at the end of the pipeline."""

ORIGINAL_INPUT_PATHS = "input_paths.txt"

Expand Down Expand Up @@ -109,7 +113,8 @@ def get_keys_from_file_names(directory, extension):

def clean_resume_files(self):
"""Remove all intermediate files created in execution."""
file_io.remove_directory(self.tmp_path, ignore_errors=True)
if self.delete_resume_log_files:
file_io.remove_directory(self.tmp_path, ignore_errors=True)

def wait_for_futures(self, futures, stage_name, fail_fast=False):
"""Wait for collected futures to complete.
Expand Down
6 changes: 6 additions & 0 deletions src/hipscat_import/soap/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,15 @@ class SoapArguments(RuntimeArguments):
resume: bool = True
"""if there are existing intermediate resume files, should we
read those and continue to run the pipeline where we left off"""
delete_resume_log_files: bool = True
"""should we delete task-level done files once each stage is complete?
if False, we will keep all done marker files at the end of the pipeline."""
write_leaf_files: bool = False
"""Should we also write out leaf parquet files (e.g. Norder/Dir/Npix.parquet)
that represent the full association table"""
delete_intermediate_parquet_files: bool = True
"""should we delete the smaller intermediate parquet files generated in the
mapping stage, once the relevant reducing stage is complete?"""

compute_partition_size: int = 1_000_000_000

Expand Down
7 changes: 6 additions & 1 deletion src/hipscat_import/soap/resume_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ class SoapPlan(PipelineResumePlan):
def __init__(self, args: SoapArguments):
if not args.tmp_path: # pragma: no cover (not reachable, but required for mypy)
raise ValueError("tmp_path is required")
super().__init__(resume=args.resume, progress_bar=args.progress_bar, tmp_path=args.tmp_path)
super().__init__(
resume=args.resume,
progress_bar=args.progress_bar,
tmp_path=args.tmp_path,
delete_resume_log_files=args.delete_resume_log_files,
)
self.gather_plan(args)

def gather_plan(self, args):
Expand Down
1 change: 1 addition & 0 deletions src/hipscat_import/soap/run_soap.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def run(args, client):
soap_args=args,
object_pixel=object_pixel,
object_key=object_key,
delete_input_files=args.delete_intermediate_parquet_files,
)
)

Expand Down
20 changes: 20 additions & 0 deletions tests/hipscat_import/margin_cache/test_margin_cache_map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,25 @@ def test_reduce_margin_shards(tmp_path):
1,
21,
original_catalog_metadata=schema_path,
delete_intermediate_parquet_files=False,
input_storage_options=None,
)

result_path = paths.pixel_catalog_file(tmp_path, 1, 21)

validate_result_dataframe(result_path, 720)
assert os.path.exists(shard_dir)

# Run again with delete_intermediate_parquet_files. shard_dir doesn't exist at the end.
margin_cache_map_reduce.reduce_margin_shards(
intermediate_dir,
"1_21",
tmp_path,
None,
1,
21,
original_catalog_metadata=schema_path,
delete_intermediate_parquet_files=True,
input_storage_options=None,
)

Expand Down Expand Up @@ -177,6 +196,7 @@ def test_reduce_margin_shards_error(tmp_path, basic_data_shard_df, capsys):
1,
21,
original_catalog_metadata=schema_path,
delete_intermediate_parquet_files=True,
input_storage_options=None,
)

Expand Down
50 changes: 21 additions & 29 deletions tests/hipscat_import/margin_cache/test_margin_cache_resume_plan.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import numpy as np
import numpy.testing as npt
import pytest
from hipscat.pixel_math.healpix_pixel import HealpixPixel
from hipscat.catalog import Catalog

from hipscat_import.margin_cache.margin_cache_arguments import MarginCacheArguments
from hipscat_import.margin_cache.margin_cache_resume_plan import (
Expand All @@ -13,10 +13,10 @@


@pytest.fixture
def small_sky_margin_args(tmp_path, small_sky_source_catalog):
def small_sky_margin_args(tmp_path, small_sky_object_catalog):
return MarginCacheArguments(
margin_threshold=5.0,
input_catalog_path=small_sky_source_catalog,
input_catalog_path=small_sky_object_catalog,
output_path=tmp_path,
output_artifact_name="catalog_cache",
progress_bar=False,
Expand All @@ -28,8 +28,7 @@ def test_done_checks(small_sky_margin_args):
"""Verify that done files imply correct pipeline execution order:
mapping > reducing
"""
pixels = [HealpixPixel(0, 11)]
plan = MarginCachePlan(small_sky_margin_args, pixels, pixels)
plan = MarginCachePlan(small_sky_margin_args)
plan.touch_stage_done_file(MarginCachePlan.REDUCING_STAGE)

with pytest.raises(ValueError, match="before reducing"):
Expand All @@ -42,7 +41,7 @@ def test_done_checks(small_sky_margin_args):

plan.clean_resume_files()

plan = MarginCachePlan(small_sky_margin_args, pixels, pixels)
plan = MarginCachePlan(small_sky_margin_args)
plan.touch_stage_done_file(MarginCachePlan.MAPPING_STAGE)
plan._gather_plan(small_sky_margin_args)

Expand All @@ -58,24 +57,16 @@ def never_fails():
@pytest.mark.dask
def test_some_map_task_failures(small_sky_margin_args, dask_client):
"""Test that we only consider map stage successful if all done files are written"""
pixels = [HealpixPixel(0, 10), HealpixPixel(0, 11)]
plan = MarginCachePlan(small_sky_margin_args, pixels, pixels)
plan = MarginCachePlan(small_sky_margin_args)

## Method doesn't FAIL, but it doesn't write out the done file either.
## Since the intermediate files aren't found, we throw an error.
futures = [dask_client.submit(never_fails)]
with pytest.raises(RuntimeError, match="2 mapping stages"):
with pytest.raises(RuntimeError, match="1 mapping stages"):
plan.wait_for_mapping(futures)

MarginCachePlan.touch_key_done_file(plan.tmp_path, MarginCachePlan.MAPPING_STAGE, "0_11")

## Method succeeds, but only *ONE* done file is present.
futures = [dask_client.submit(never_fails)]
with pytest.raises(RuntimeError, match="1 mapping stage"):
plan.wait_for_mapping(futures)

MarginCachePlan.touch_key_done_file(plan.tmp_path, MarginCachePlan.MAPPING_STAGE, "0_10")

## Method succeeds, *and* done files are present.
futures = [dask_client.submit(never_fails)]
plan.wait_for_mapping(futures)
Expand All @@ -84,48 +75,49 @@ def test_some_map_task_failures(small_sky_margin_args, dask_client):
@pytest.mark.dask
def test_some_reducing_task_failures(small_sky_margin_args, dask_client):
"""Test that we only consider reduce stage successful if all done files are written"""
pixels = [HealpixPixel(0, 10), HealpixPixel(0, 11)]
plan = MarginCachePlan(small_sky_margin_args, pixels, pixels)
plan = MarginCachePlan(small_sky_margin_args)

## Method doesn't FAIL, but it doesn't write out the done file either.
## Since the intermediate files aren't found, we throw an error.
futures = [dask_client.submit(never_fails)]
with pytest.raises(RuntimeError, match="2 reducing stages"):
with pytest.raises(RuntimeError, match="12 reducing stages"):
plan.wait_for_reducing(futures)

MarginCachePlan.touch_key_done_file(plan.tmp_path, MarginCachePlan.REDUCING_STAGE, "0_11")

## Method succeeds, but only *ONE* done file is present.
futures = [dask_client.submit(never_fails)]
with pytest.raises(RuntimeError, match="1 reducing stage"):
with pytest.raises(RuntimeError, match="11 reducing stages"):
plan.wait_for_reducing(futures)

MarginCachePlan.touch_key_done_file(plan.tmp_path, MarginCachePlan.REDUCING_STAGE, "0_10")
for partition in range(0, 12):
MarginCachePlan.touch_key_done_file(plan.tmp_path, MarginCachePlan.REDUCING_STAGE, f"0_{partition}")

## Method succeeds, *and* done files are present.
futures = [dask_client.submit(never_fails)]
plan.wait_for_reducing(futures)


def test_partition_margin_pixel_pairs(small_sky_margin_args):
def test_partition_margin_pixel_pairs(small_sky_source_catalog):
"""Ensure partition_margin_pixel_pairs can generate main partition pixels."""
margin_pairs = _find_partition_margin_pixel_pairs(
small_sky_margin_args.catalog.partition_info.get_healpix_pixels(), small_sky_margin_args.margin_order
)
source_catalog = Catalog.read_from_hipscat(small_sky_source_catalog)
margin_pairs = _find_partition_margin_pixel_pairs(source_catalog.get_healpix_pixels(), 3)

expected = np.array([725, 733, 757, 765, 727, 735, 759, 767, 469, 192])

npt.assert_array_equal(margin_pairs.iloc[:10]["margin_pixel"], expected)
assert len(margin_pairs) == 196


def test_partition_margin_pixel_pairs_negative(small_sky_margin_args):
def test_partition_margin_pixel_pairs_negative(small_sky_source_catalog):
"""Ensure partition_margin_pixel_pairs can generate negative tree pixels."""
partition_stats = small_sky_margin_args.catalog.partition_info.get_healpix_pixels()
negative_pixels = small_sky_margin_args.catalog.generate_negative_tree_pixels()
source_catalog = Catalog.read_from_hipscat(small_sky_source_catalog)

partition_stats = source_catalog.get_healpix_pixels()
negative_pixels = source_catalog.generate_negative_tree_pixels()
combined_pixels = partition_stats + negative_pixels

margin_pairs = _find_partition_margin_pixel_pairs(combined_pixels, small_sky_margin_args.margin_order)
margin_pairs = _find_partition_margin_pixel_pairs(combined_pixels, 3)

expected_order = 0
expected_pixel = 10
Expand Down