diff --git a/src/hipscat_import/association/__init__.py b/src/hipscat_import/association/__init__.py new file mode 100644 index 00000000..4d721422 --- /dev/null +++ b/src/hipscat_import/association/__init__.py @@ -0,0 +1,5 @@ +"""Modules for creating a new association table from an equijoin between two catalogs""" + +from .arguments import AssociationArguments +from .map_reduce import map_association, reduce_association +from .run_association import run diff --git a/src/hipscat_import/association/arguments.py b/src/hipscat_import/association/arguments.py new file mode 100644 index 00000000..28b8f852 --- /dev/null +++ b/src/hipscat_import/association/arguments.py @@ -0,0 +1,74 @@ +"""Utility to hold all arguments required throughout association pipeline""" + +from dataclasses import dataclass + +from hipscat.catalog import CatalogParameters + +from hipscat_import.runtime_arguments import RuntimeArguments + +# pylint: disable=too-many-instance-attributes + + +@dataclass +class AssociationArguments(RuntimeArguments): + """Data class for holding association arguments""" + + ## Input - Primary + primary_input_catalog_path: str = "" + primary_id_column: str = "" + primary_join_column: str = "" + + ## Input - Join + join_input_catalog_path: str = "" + join_id_column: str = "" + join_foreign_key: str = "" + + compute_partition_size: int = 1_000_000_000 + + def __post_init__(self): + self._check_arguments() + + def _check_arguments(self): + super()._check_arguments() + if not self.primary_input_catalog_path: + raise ValueError("primary_input_catalog_path is required") + if not self.primary_id_column: + raise ValueError("primary_id_column is required") + if not self.primary_join_column: + raise ValueError("primary_join_column is required") + if self.primary_id_column in ["primary_id", "join_id"]: + raise ValueError("primary_id_column uses a reserved column name") + + if not self.join_input_catalog_path: + raise ValueError("join_input_catalog_path is required") + if not self.join_id_column: + raise ValueError("join_id_column is required") + if not self.join_foreign_key: + raise ValueError("join_foreign_key is required") + if self.join_id_column in ["primary_id", "join_id"]: + raise ValueError("join_id_column uses a reserved column name") + + if self.compute_partition_size < 100_000: + raise ValueError("compute_partition_size must be at least 100_000") + + def to_catalog_parameters(self) -> CatalogParameters: + """Convert importing arguments into hipscat catalog parameters. + + Returns: + CatalogParameters for catalog being created. + """ + return CatalogParameters( + catalog_name=self.output_catalog_name, + catalog_type="association", + output_path=self.output_path, + ) + + def additional_runtime_provenance_info(self): + return { + "primary_input_catalog_path": str(self.primary_input_catalog_path), + "primary_id_column": self.primary_id_column, + "primary_join_column": self.primary_join_column, + "join_input_catalog_path": str(self.join_input_catalog_path), + "join_id_column": self.join_id_column, + "join_foreign_key": self.join_foreign_key, + } diff --git a/src/hipscat_import/association/map_reduce.py b/src/hipscat_import/association/map_reduce.py new file mode 100644 index 00000000..ebb6ece5 --- /dev/null +++ b/src/hipscat_import/association/map_reduce.py @@ -0,0 +1,202 @@ +"""Create partitioned association table between two catalogs""" + +import dask.dataframe as dd +import pyarrow.parquet as pq +from hipscat.io import file_io, paths + + +def map_association(args): + """Using dask dataframes, create an association between two catalogs. + This will write out sharded parquet files to the temp (intermediate) + directory. + + Implementation notes: + + Because we may be joining to a column that is NOT the natural/primary key + (on either side of the join), we fetch both the identifying column and the + join predicate column, possibly duplicating one of the columns. + + This way, when we drop the join predicate columns at the end of the process, + we will still have the identifying columns. However, it makes the loading + of each input catalog more verbose. + """ + ## Read and massage primary input data + ## NB: We may be joining on a column that is NOT the natural primary key. + single_primary_column = args.primary_id_column == args.primary_join_column + read_columns = [ + "Norder", + "Dir", + "Npix", + ] + if single_primary_column: + read_columns = [args.primary_id_column] + read_columns + else: + read_columns = [args.primary_join_column, args.primary_id_column] + read_columns + + primary_index = dd.read_parquet( + path=args.primary_input_catalog_path, + columns=read_columns, + dataset={"partitioning": "hive"}, + ) + if single_primary_column: + ## Duplicate the column to simplify later steps + primary_index["primary_id"] = primary_index[args.primary_join_column] + rename_columns = { + args.primary_join_column: "primary_join", + "_hipscat_index": "primary_hipscat_index", + } + if not single_primary_column: + rename_columns[args.primary_id_column] = "primary_id" + primary_index = ( + primary_index.reset_index() + .rename(columns=rename_columns) + .set_index("primary_join") + ) + + ## Read and massage join input data + single_join_column = args.join_id_column == args.join_foreign_key + read_columns = [ + "Norder", + "Dir", + "Npix", + ] + if single_join_column: + read_columns = [args.join_id_column] + read_columns + else: + read_columns = [args.join_id_column, args.join_foreign_key] + read_columns + + join_index = dd.read_parquet( + path=args.join_input_catalog_path, + columns=read_columns, + dataset={"partitioning": "hive"}, + ) + if single_join_column: + ## Duplicate the column to simplify later steps + join_index["join_id"] = join_index[args.join_id_column] + rename_columns = { + args.join_foreign_key: "join_to_primary", + "_hipscat_index": "join_hipscat_index", + "Norder": "join_Norder", + "Dir": "join_Dir", + "Npix": "join_Npix", + } + if not single_join_column: + rename_columns[args.join_id_column] = "join_id" + join_index = ( + join_index.reset_index() + .rename(columns=rename_columns) + .set_index("join_to_primary") + ) + + ## Join the two data sets on the shared join predicate. + join_data = primary_index.merge( + join_index, how="inner", left_index=True, right_index=True + ) + + ## Write out a summary of each partition join + groups = ( + join_data.groupby( + ["Norder", "Dir", "Npix", "join_Norder", "join_Dir", "join_Npix"], + group_keys=False, + )["primary_hipscat_index"] + .count() + .compute() + ) + intermediate_partitions_file = file_io.append_paths_to_pointer( + args.tmp_path, "partitions.csv" + ) + file_io.write_dataframe_to_csv( + dataframe=groups, file_pointer=intermediate_partitions_file + ) + + ## Drop join predicate columns + join_data = join_data[ + [ + "Norder", + "Dir", + "Npix", + "join_Norder", + "join_Dir", + "join_Npix", + "primary_id", + "primary_hipscat_index", + "join_id", + "join_hipscat_index", + ] + ] + + ## Write out association table shards. + join_data.to_parquet( + path=args.tmp_path, + engine="pyarrow", + partition_on=["Norder", "Dir", "Npix", "join_Norder", "join_Dir", "join_Npix"], + compute_kwargs={"partition_size": args.compute_partition_size}, + write_index=False, + ) + + +def reduce_association(input_path, output_path): + """Collate sharded parquet files into a single parquet file per partition""" + intermediate_partitions_file = file_io.append_paths_to_pointer( + input_path, "partitions.csv" + ) + data_frame = file_io.load_csv_to_pandas(intermediate_partitions_file) + + ## Clean up the dataframe and write out as our new partition join info file. + data_frame = data_frame[data_frame["primary_hipscat_index"] != 0] + data_frame["num_rows"] = data_frame["primary_hipscat_index"] + data_frame = data_frame[ + ["Norder", "Dir", "Npix", "join_Norder", "join_Dir", "join_Npix", "num_rows"] + ] + data_frame = data_frame.sort_values(["Norder", "Npix", "join_Norder", "join_Npix"]) + file_io.write_dataframe_to_csv( + dataframe=data_frame, + file_pointer=file_io.append_paths_to_pointer( + output_path, "partition_join_info.csv" + ), + index=False, + ) + + ## For each partition, join all parquet shards into single parquet file. + for _, partition in data_frame.iterrows(): + input_dir = paths.create_hive_directory_name( + input_path, + ["Norder", "Dir", "Npix", "join_Norder", "join_Dir", "join_Npix"], + [ + partition["Norder"], + partition["Dir"], + partition["Npix"], + partition["join_Norder"], + partition["join_Dir"], + partition["join_Npix"], + ], + ) + output_dir = paths.pixel_association_directory( + output_path, + partition["Norder"], + partition["Npix"], + partition["join_Norder"], + partition["join_Npix"], + ) + file_io.make_directory(output_dir, exist_ok=True) + output_file = paths.pixel_association_file( + output_path, + partition["Norder"], + partition["Npix"], + partition["join_Norder"], + partition["join_Npix"], + ) + table = pq.read_table(input_dir) + rows_written = len(table) + + if rows_written != partition["num_rows"]: + raise ValueError( + "Unexpected number of objects ", + f" Expected {partition['num_rows']}, wrote {rows_written}", + ) + + table.to_pandas().set_index("primary_hipscat_index").sort_index().to_parquet( + output_file + ) + + return data_frame["num_rows"].sum() diff --git a/src/hipscat_import/association/run_association.py b/src/hipscat_import/association/run_association.py new file mode 100644 index 00000000..0e713254 --- /dev/null +++ b/src/hipscat_import/association/run_association.py @@ -0,0 +1,50 @@ +"""Create partitioned association table between two catalogs +using dask dataframes for parallelization + +Methods in this file set up a dask pipeline using dataframes. +The actual logic of the map reduce is in the `map_reduce.py` file. +""" + +from hipscat.io import file_io, write_metadata +from tqdm import tqdm + +from hipscat_import.association.arguments import AssociationArguments +from hipscat_import.association.map_reduce import map_association, reduce_association + + +def _validate_args(args): + if not args: + raise TypeError("args is required and should be type AssociationArguments") + if not isinstance(args, AssociationArguments): + raise TypeError("args must be type AssociationArguments") + + +def run(args): + """Run the association pipeline""" + _validate_args(args) + + with tqdm(total=1, desc="Mapping ", disable=not args.progress_bar) as step_progress: + map_association(args) + step_progress.update(1) + + rows_written = 0 + with tqdm( + total=1, desc="Reducing ", disable=not args.progress_bar + ) as step_progress: + rows_written = reduce_association(args.tmp_path, args.catalog_path) + step_progress.update(1) + + # All done - write out the metadata + with tqdm( + total=4, desc="Finishing", disable=not args.progress_bar + ) as step_progress: + catalog_params = args.to_catalog_parameters() + catalog_params.total_rows = int(rows_written) + write_metadata.write_provenance_info(catalog_params, args.provenance_info()) + step_progress.update(1) + write_metadata.write_catalog_info(catalog_params) + step_progress.update(1) + write_metadata.write_parquet_metadata(args.catalog_path) + step_progress.update(1) + file_io.remove_directory(args.tmp_path, ignore_errors=True) + step_progress.update(1) diff --git a/tests/hipscat_import/association/test_association_argument.py b/tests/hipscat_import/association/test_association_argument.py new file mode 100644 index 00000000..b14c785a --- /dev/null +++ b/tests/hipscat_import/association/test_association_argument.py @@ -0,0 +1,214 @@ +"""Tests of argument validation, in the absense of command line parsing""" + + +import pytest + +from hipscat_import.association.arguments import AssociationArguments + +# pylint: disable=protected-access + + +def test_none(): + """No arguments provided. Should error for required args.""" + with pytest.raises(ValueError): + AssociationArguments() + + +def test_empty_required(tmp_path, small_sky_object_catalog): + """All non-runtime arguments are required.""" + ## primary_input_catalog_path is missing + with pytest.raises(ValueError, match="primary_input_catalog_path"): + AssociationArguments( + primary_input_catalog_path=None, ## empty + primary_id_column="id", + primary_join_column="id", + join_input_catalog_path=small_sky_object_catalog, + join_id_column="id", + join_foreign_key="id", + output_path=tmp_path, + output_catalog_name="small_sky_self_join", + ) + + with pytest.raises(ValueError, match="primary_id_column"): + AssociationArguments( + primary_input_catalog_path=small_sky_object_catalog, + primary_id_column="", ## empty + primary_join_column="id", + join_input_catalog_path=small_sky_object_catalog, + join_id_column="id", + join_foreign_key="id", + output_path=tmp_path, + output_catalog_name="small_sky_self_join", + overwrite=True, + ) + + with pytest.raises(ValueError, match="primary_join_column"): + AssociationArguments( + primary_input_catalog_path=small_sky_object_catalog, + primary_id_column="id", + primary_join_column="", ## empty + join_input_catalog_path=small_sky_object_catalog, + join_id_column="id", + join_foreign_key="id", + output_path=tmp_path, + output_catalog_name="small_sky_self_join", + overwrite=True, + ) + + with pytest.raises(ValueError, match="join_input_catalog_path"): + AssociationArguments( + primary_input_catalog_path=small_sky_object_catalog, + primary_id_column="id", + primary_join_column="id", + join_input_catalog_path="", ## empty + join_id_column="id", + join_foreign_key="id", + output_path=tmp_path, + output_catalog_name="small_sky_self_join", + overwrite=True, + ) + + with pytest.raises(ValueError, match="join_id_column"): + AssociationArguments( + primary_input_catalog_path=small_sky_object_catalog, + primary_id_column="id", + primary_join_column="id", + join_input_catalog_path=small_sky_object_catalog, + join_id_column="", ## empty + join_foreign_key="id", + output_path=tmp_path, + output_catalog_name="small_sky_self_join", + overwrite=True, + ) + + with pytest.raises(ValueError, match="join_foreign_key"): + AssociationArguments( + primary_input_catalog_path=small_sky_object_catalog, + primary_id_column="id", + primary_join_column="id", + join_input_catalog_path=small_sky_object_catalog, + join_id_column="id", + join_foreign_key="", ## empty + output_path=tmp_path, + output_catalog_name="small_sky_self_join", + overwrite=True, + ) + + with pytest.raises(ValueError, match="output_path"): + AssociationArguments( + primary_input_catalog_path=small_sky_object_catalog, + primary_id_column="id", + primary_join_column="id", + join_input_catalog_path=small_sky_object_catalog, + join_id_column="id", + join_foreign_key="id", + output_path="", ## empty + output_catalog_name="small_sky_self_join", + overwrite=True, + ) + + with pytest.raises(ValueError, match="output_catalog_name"): + AssociationArguments( + primary_input_catalog_path=small_sky_object_catalog, + primary_id_column="id", + primary_join_column="id", + join_input_catalog_path=small_sky_object_catalog, + join_id_column="id", + join_foreign_key="id", + output_path=tmp_path, + output_catalog_name="", ## empty + overwrite=True, + ) + + +def test_column_names(tmp_path, small_sky_object_catalog): + """Test validation of column names.""" + with pytest.raises(ValueError, match="primary_id_column"): + AssociationArguments( + primary_input_catalog_path=small_sky_object_catalog, + primary_id_column="primary_id", + primary_join_column="id", + join_input_catalog_path=small_sky_object_catalog, + join_id_column="id", + join_foreign_key="id", + output_path=tmp_path, + output_catalog_name="bad_columns", ## empty + overwrite=True, + ) + + with pytest.raises(ValueError, match="join_id_column"): + AssociationArguments( + primary_input_catalog_path=small_sky_object_catalog, + primary_id_column="id", + primary_join_column="id", + join_input_catalog_path=small_sky_object_catalog, + join_id_column="primary_id", + join_foreign_key="id", + output_path=tmp_path, + output_catalog_name="bad_columns", ## empty + overwrite=True, + ) + + +def test_compute_partition_size(tmp_path, small_sky_object_catalog): + """Test validation of compute_partition_size.""" + with pytest.raises(ValueError, match="compute_partition_size"): + AssociationArguments( + primary_input_catalog_path=small_sky_object_catalog, + primary_id_column="id", + primary_join_column="id", + join_input_catalog_path=small_sky_object_catalog, + join_id_column="id", + join_foreign_key="id", + output_path=tmp_path, + output_catalog_name="bad_columns", + compute_partition_size=10, ## not a valid join option + overwrite=True, + ) + + +def test_all_required_args(tmp_path, small_sky_object_catalog): + """Required arguments are provided.""" + AssociationArguments( + primary_input_catalog_path=small_sky_object_catalog, + primary_id_column="id", + primary_join_column="id", + join_input_catalog_path=small_sky_object_catalog, + join_id_column="id", + join_foreign_key="id", + output_path=tmp_path, + output_catalog_name="small_sky_self_join", + ) + + +def test_to_catalog_parameters(small_sky_object_catalog, tmp_path): + """Verify creation of catalog parameters for index to be created.""" + args = AssociationArguments( + primary_input_catalog_path=small_sky_object_catalog, + primary_id_column="id", + primary_join_column="id", + join_input_catalog_path=small_sky_object_catalog, + join_id_column="id", + join_foreign_key="id", + output_path=tmp_path, + output_catalog_name="small_sky_self_join", + ) + catalog_parameters = args.to_catalog_parameters() + assert catalog_parameters.catalog_name == args.output_catalog_name + + +def test_provenance_info(small_sky_object_catalog, tmp_path): + """Verify that provenance info includes association-specific fields.""" + args = AssociationArguments( + primary_input_catalog_path=small_sky_object_catalog, + primary_id_column="id", + primary_join_column="id", + join_input_catalog_path=small_sky_object_catalog, + join_id_column="id", + join_foreign_key="id", + output_path=tmp_path, + output_catalog_name="small_sky_self_join", + ) + + runtime_args = args.provenance_info()["runtime_args"] + assert "primary_input_catalog_path" in runtime_args diff --git a/tests/hipscat_import/association/test_association_map_reduce.py b/tests/hipscat_import/association/test_association_map_reduce.py new file mode 100644 index 00000000..e24fd8ee --- /dev/null +++ b/tests/hipscat_import/association/test_association_map_reduce.py @@ -0,0 +1,176 @@ +"""Test behavior of map reduce methods in association.""" + +import json +import os + +import pandas as pd +import pytest + +import hipscat_import.catalog.run_import as runner +from hipscat_import.association.arguments import AssociationArguments +from hipscat_import.association.map_reduce import (map_association, + reduce_association) +from hipscat_import.catalog.arguments import ImportArguments + + +@pytest.mark.filterwarnings("ignore::DeprecationWarning") +@pytest.mark.timeout(15) +def test_map_association( + dask_client, tmp_path, formats_headers_csv, small_sky_object_catalog +): + """Test association with partially-overlapping dataset. + + This has the added benefit of testing a freshly-minted catalog as input.""" + args = ImportArguments( + output_catalog_name="subset_catalog", + input_file_list=[formats_headers_csv], + input_format="csv", + output_path=tmp_path, + ra_column="ra_mean", + dec_column="dec_mean", + id_column="object_id", + dask_tmp=tmp_path, + highest_healpix_order=1, + progress_bar=False, + ) + subset_catalog_path = args.catalog_path + + runner.run_with_client(args, dask_client) + + with open( + os.path.join(subset_catalog_path, "catalog_info.json"), "r", encoding="utf-8" + ) as metadata_info: + metadata_keywords = json.load(metadata_info) + assert metadata_keywords["total_rows"] == 8 + + args = AssociationArguments( + primary_input_catalog_path=small_sky_object_catalog, + primary_id_column="id", + primary_join_column="id", + join_input_catalog_path=subset_catalog_path, + output_catalog_name="association_inner", + join_foreign_key="object_id", + join_id_column="object_id", + output_path=tmp_path, + progress_bar=False, + ) + + map_association(args) + intermediate_partitions_file = os.path.join( + args.catalog_path, "intermediate", "partitions.csv" + ) + data_frame = pd.read_csv(intermediate_partitions_file) + assert data_frame["primary_hipscat_index"].sum() == 8 + + +def test_reduce_bad_inputs(tmp_path, assert_text_file_matches): + """Test reducing with corrupted input.""" + + input_path = os.path.join(tmp_path, "incomplete_inputs") + os.makedirs(input_path, exist_ok=True) + + output_path = os.path.join(tmp_path, "output") + os.makedirs(output_path, exist_ok=True) + + ## We don't even have a partitions file + with pytest.raises(FileNotFoundError): + reduce_association(input_path=input_path, output_path=output_path) + + ## Create a partitions file, but it doesn't have the right columns + partitions_data = pd.DataFrame( + data=[[700, 282.5, -58.5], [701, 299.5, -48.5]], + columns=["id", "ra", "dec"], + ) + partitions_csv_file = os.path.join(input_path, "partitions.csv") + partitions_data.to_csv(partitions_csv_file, index=False) + + with pytest.raises(KeyError, match="primary_hipscat_index"): + reduce_association(input_path=input_path, output_path=output_path) + + ## Create a partitions file, but it doesn't have corresponding parquet data. + partitions_data = pd.DataFrame( + data=[[0, 0, 11, 0, 0, 11, 131]], + columns=[ + "Norder", + "Dir", + "Npix", + "join_Norder", + "join_Dir", + "join_Npix", + "primary_hipscat_index", + ], + ) + partitions_data.to_csv(partitions_csv_file, index=False) + + with pytest.raises(FileNotFoundError): + reduce_association(input_path=input_path, output_path=output_path) + + ## We still wrote out the partition info file, though! + expected_lines = [ + "Norder,Dir,Npix,join_Norder,join_Dir,join_Npix,num_rows", + "0,0,11,0,0,11,131", + ] + metadata_filename = os.path.join(output_path, "partition_join_info.csv") + assert_text_file_matches(expected_lines, metadata_filename) + + +def test_reduce_bad_expectation(tmp_path): + """Test reducing with corrupted input.""" + input_path = os.path.join(tmp_path, "incomplete_inputs") + os.makedirs(input_path, exist_ok=True) + + output_path = os.path.join(tmp_path, "output") + os.makedirs(output_path, exist_ok=True) + + ## Create a partitions file, and a parquet file with not-enough rows. + partitions_data = pd.DataFrame( + data=[[0, 0, 11, 0, 0, 11, 3]], + columns=[ + "Norder", + "Dir", + "Npix", + "join_Norder", + "join_Dir", + "join_Npix", + "primary_hipscat_index", + ], + ) + partitions_csv_file = os.path.join(input_path, "partitions.csv") + partitions_data.to_csv(partitions_csv_file, index=False) + + parquet_dir = os.path.join( + input_path, + "Norder=0", + "Dir=0", + "Npix=11", + "join_Norder=0", + "join_Dir=0", + "join_Npix=11", + ) + os.makedirs(parquet_dir, exist_ok=True) + + parquet_data = pd.DataFrame( + data=[[700, 7_000_000, 800, 8_000_000], [701, 7_000_100, 801, 8_001_000]], + columns=[ + "primary_id", + "primary_hipscat_index", + "join_id", + "join_hipscat_index", + ], + ) + parquet_data.to_parquet(os.path.join(parquet_dir, "part0.parquet")) + with pytest.raises(ValueError, match="Unexpected"): + reduce_association(input_path=input_path, output_path=output_path) + + ## Add one more row in another file, and the expectation is met. + parquet_data = pd.DataFrame( + data=[[702, 7_002_000, 802, 8_002_000]], + columns=[ + "primary_id", + "primary_hipscat_index", + "join_id", + "join_hipscat_index", + ], + ) + parquet_data.to_parquet(os.path.join(parquet_dir, "part1.parquet")) + reduce_association(input_path=input_path, output_path=output_path) diff --git a/tests/hipscat_import/association/test_run_association.py b/tests/hipscat_import/association/test_run_association.py new file mode 100644 index 00000000..d9f0ed88 --- /dev/null +++ b/tests/hipscat_import/association/test_run_association.py @@ -0,0 +1,259 @@ +"""test stuff.""" + +import os + +import numpy as np +import numpy.testing as npt +import pandas as pd +import pytest + +import hipscat_import.association.run_association as runner +from hipscat_import.association.arguments import AssociationArguments + + +def test_empty_args(): + """Runner should fail with empty arguments""" + with pytest.raises(TypeError): + runner.run(None) + + +def test_bad_args(): + """Runner should fail with mis-typed arguments""" + args = {"output_catalog_name": "bad_arg_type"} + with pytest.raises(TypeError): + runner.run(args) + + +@pytest.mark.filterwarnings("ignore::DeprecationWarning") +@pytest.mark.timeout(15) +def test_object_to_source( + small_sky_object_catalog, + small_sky_source_catalog, + tmp_path, + assert_text_file_matches, +): + """test stuff""" + + args = AssociationArguments( + primary_input_catalog_path=small_sky_object_catalog, + primary_id_column="id", + primary_join_column="id", + join_input_catalog_path=small_sky_source_catalog, + output_catalog_name="small_sky_association", + join_id_column="source_id", + join_foreign_key="object_id", + output_path=tmp_path, + progress_bar=False, + ) + runner.run(args) + + # Check that the catalog metadata file exists + expected_metadata_lines = [ + "{", + ' "catalog_name": "small_sky_association",', + ' "catalog_type": "association",', + ' "epoch": "J2000",', + ' "ra_kw": "ra",', + ' "dec_kw": "dec",', + ' "total_rows": 17161', + "}", + ] + metadata_filename = os.path.join(args.catalog_path, "catalog_info.json") + assert_text_file_matches(expected_metadata_lines, metadata_filename) + + # Check that the partition *join* info file exists + expected_lines = [ + "Norder,Dir,Npix,join_Norder,join_Dir,join_Npix,num_rows", + "0,0,11,0,0,4,50", + "0,0,11,1,0,47,2395", + "0,0,11,2,0,176,385", + "0,0,11,2,0,177,1510", + "0,0,11,2,0,178,1634", + "0,0,11,2,0,179,1773", + "0,0,11,2,0,180,655", + "0,0,11,2,0,181,903", + "0,0,11,2,0,182,1246", + "0,0,11,2,0,183,1143", + "0,0,11,2,0,184,1390", + "0,0,11,2,0,185,2942", + "0,0,11,2,0,186,452", + "0,0,11,2,0,187,683", + ] + metadata_filename = os.path.join(args.catalog_path, "partition_join_info.csv") + assert_text_file_matches(expected_lines, metadata_filename) + + ## Test one pixel that will have 50 rows in it. + output_file = os.path.join( + tmp_path, + "small_sky_association", + "Norder=0", + "Dir=0", + "Npix=11", + "join_Norder=0", + "join_Dir=0", + "join_Npix=4.parquet", + ) + assert os.path.exists(output_file), f"file not found [{output_file}]" + data_frame = pd.read_parquet(output_file, engine="pyarrow") + npt.assert_array_equal( + data_frame.columns, + ["primary_id", "join_id", "join_hipscat_index"], + ) + assert data_frame.index.name == "primary_hipscat_index" + assert len(data_frame) == 50 + ids = data_frame["primary_id"] + assert np.logical_and(ids >= 700, ids < 832).all() + ids = data_frame["join_id"] + assert np.logical_and(ids >= 70_000, ids < 87161).all() + + +@pytest.mark.filterwarnings("ignore::DeprecationWarning") +@pytest.mark.timeout(15) +def test_source_to_object( + small_sky_object_catalog, + small_sky_source_catalog, + tmp_path, + assert_text_file_matches, +): + """test stuff""" + + args = AssociationArguments( + primary_input_catalog_path=small_sky_source_catalog, + primary_id_column="source_id", + primary_join_column="object_id", + join_input_catalog_path=small_sky_object_catalog, + join_id_column="id", + join_foreign_key="id", + output_catalog_name="small_sky_association", + output_path=tmp_path, + progress_bar=False, + ) + runner.run(args) + + # Check that the catalog metadata file exists + expected_metadata_lines = [ + "{", + ' "catalog_name": "small_sky_association",', + ' "catalog_type": "association",', + ' "epoch": "J2000",', + ' "ra_kw": "ra",', + ' "dec_kw": "dec",', + ' "total_rows": 17161', + "}", + ] + metadata_filename = os.path.join(args.catalog_path, "catalog_info.json") + assert_text_file_matches(expected_metadata_lines, metadata_filename) + + # Check that the partition *join* info file exists + expected_lines = [ + "Norder,Dir,Npix,join_Norder,join_Dir,join_Npix,num_rows", + "0,0,4,0,0,11,50", + "1,0,47,0,0,11,2395", + "2,0,176,0,0,11,385", + "2,0,177,0,0,11,1510", + "2,0,178,0,0,11,1634", + "2,0,179,0,0,11,1773", + "2,0,180,0,0,11,655", + "2,0,181,0,0,11,903", + "2,0,182,0,0,11,1246", + "2,0,183,0,0,11,1143", + "2,0,184,0,0,11,1390", + "2,0,185,0,0,11,2942", + "2,0,186,0,0,11,452", + "2,0,187,0,0,11,683", + ] + metadata_filename = os.path.join(args.catalog_path, "partition_join_info.csv") + assert_text_file_matches(expected_lines, metadata_filename) + + ## Test one pixel that will have 50 rows in it. + output_file = os.path.join( + tmp_path, + "small_sky_association", + "Norder=0", + "Dir=0", + "Npix=4", + "join_Norder=0", + "join_Dir=0", + "join_Npix=11.parquet", + ) + assert os.path.exists(output_file), f"file not found [{output_file}]" + data_frame = pd.read_parquet(output_file, engine="pyarrow") + npt.assert_array_equal( + data_frame.columns, + ["primary_id", "join_id", "join_hipscat_index"], + ) + assert data_frame.index.name == "primary_hipscat_index" + assert len(data_frame) == 50 + ids = data_frame["primary_id"] + assert np.logical_and(ids >= 70_000, ids < 87161).all() + ids = data_frame["join_id"] + assert np.logical_and(ids >= 700, ids < 832).all() + + +@pytest.mark.filterwarnings("ignore::DeprecationWarning") +@pytest.mark.timeout(15) +def test_self_join( + small_sky_object_catalog, + tmp_path, + assert_text_file_matches, +): + """test stuff""" + + args = AssociationArguments( + primary_input_catalog_path=small_sky_object_catalog, + primary_id_column="id", + primary_join_column="id", + join_input_catalog_path=small_sky_object_catalog, + output_catalog_name="small_sky_self_association", + join_foreign_key="id", + join_id_column="id", + output_path=tmp_path, + progress_bar=False, + ) + runner.run(args) + + # Check that the catalog metadata file exists + expected_metadata_lines = [ + "{", + ' "catalog_name": "small_sky_self_association",', + ' "catalog_type": "association",', + ' "epoch": "J2000",', + ' "ra_kw": "ra",', + ' "dec_kw": "dec",', + ' "total_rows": 131', + "}", + ] + metadata_filename = os.path.join(args.catalog_path, "catalog_info.json") + assert_text_file_matches(expected_metadata_lines, metadata_filename) + + # Check that the partition *join* info file exists + expected_lines = [ + "Norder,Dir,Npix,join_Norder,join_Dir,join_Npix,num_rows", + "0,0,11,0,0,11,131", + ] + metadata_filename = os.path.join(args.catalog_path, "partition_join_info.csv") + assert_text_file_matches(expected_lines, metadata_filename) + + ## Test one pixel that will have 50 rows in it. + output_file = os.path.join( + tmp_path, + "small_sky_self_association", + "Norder=0", + "Dir=0", + "Npix=11", + "join_Norder=0", + "join_Dir=0", + "join_Npix=11.parquet", + ) + assert os.path.exists(output_file), f"file not found [{output_file}]" + data_frame = pd.read_parquet(output_file, engine="pyarrow") + npt.assert_array_equal( + data_frame.columns, + ["primary_id", "join_id", "join_hipscat_index"], + ) + assert data_frame.index.name == "primary_hipscat_index" + assert len(data_frame) == 131 + ids = data_frame["primary_id"] + assert np.logical_and(ids >= 700, ids < 832).all() + ids = data_frame["join_id"] + assert np.logical_and(ids >= 700, ids < 832).all() diff --git a/tests/hipscat_import/conftest.py b/tests/hipscat_import/conftest.py index d7d7ce8c..38328854 100644 --- a/tests/hipscat_import/conftest.py +++ b/tests/hipscat_import/conftest.py @@ -1,3 +1,5 @@ +"""Fixtures for testing import tool actions.""" + import os import re @@ -7,7 +9,7 @@ from dask.distributed import Client -@pytest.fixture(scope="package", name="dask_client") +@pytest.fixture(scope="session", name="dask_client") def dask_client(): """Create a single client for use by all unit test cases.""" client = Client() @@ -34,11 +36,21 @@ def small_sky_single_file(test_data_dir): return os.path.join(test_data_dir, "small_sky", "catalog.csv") +@pytest.fixture +def small_sky_object_catalog(test_data_dir): + return os.path.join(test_data_dir, "small_sky_object_catalog") + + @pytest.fixture def small_sky_source_dir(test_data_dir): return os.path.join(test_data_dir, "small_sky_source") +@pytest.fixture +def small_sky_source_catalog(test_data_dir): + return os.path.join(test_data_dir, "small_sky_source_catalog") + + @pytest.fixture def blank_data_dir(test_data_dir): return os.path.join(test_data_dir, "blank") diff --git a/tests/hipscat_import/data/small_sky_object_catalog/Norder=0/Dir=0/Npix=11.parquet b/tests/hipscat_import/data/small_sky_object_catalog/Norder=0/Dir=0/Npix=11.parquet new file mode 100644 index 00000000..7aed5e2b Binary files /dev/null and b/tests/hipscat_import/data/small_sky_object_catalog/Norder=0/Dir=0/Npix=11.parquet differ diff --git a/tests/hipscat_import/data/small_sky_object_catalog/_common_metadata b/tests/hipscat_import/data/small_sky_object_catalog/_common_metadata new file mode 100644 index 00000000..a1505a28 Binary files /dev/null and b/tests/hipscat_import/data/small_sky_object_catalog/_common_metadata differ diff --git a/tests/hipscat_import/data/small_sky_object_catalog/_metadata b/tests/hipscat_import/data/small_sky_object_catalog/_metadata new file mode 100644 index 00000000..63b03188 Binary files /dev/null and b/tests/hipscat_import/data/small_sky_object_catalog/_metadata differ diff --git a/tests/hipscat_import/data/small_sky_object_catalog/catalog_info.json b/tests/hipscat_import/data/small_sky_object_catalog/catalog_info.json new file mode 100644 index 00000000..451af75e --- /dev/null +++ b/tests/hipscat_import/data/small_sky_object_catalog/catalog_info.json @@ -0,0 +1,8 @@ +{ + "catalog_name": "small_sky_object_catalog", + "catalog_type": "object", + "epoch": "J2000", + "ra_kw": "ra", + "dec_kw": "dec", + "total_rows": 131 +} diff --git a/tests/hipscat_import/data/small_sky_object_catalog/partition_info.csv b/tests/hipscat_import/data/small_sky_object_catalog/partition_info.csv new file mode 100644 index 00000000..ed015721 --- /dev/null +++ b/tests/hipscat_import/data/small_sky_object_catalog/partition_info.csv @@ -0,0 +1,2 @@ +Norder,Dir,Npix,num_rows +0,0,11,131 diff --git a/tests/hipscat_import/data/small_sky_object_catalog/point_map.fits b/tests/hipscat_import/data/small_sky_object_catalog/point_map.fits new file mode 100644 index 00000000..e7287c9f Binary files /dev/null and b/tests/hipscat_import/data/small_sky_object_catalog/point_map.fits differ diff --git a/tests/hipscat_import/data/small_sky_object_catalog/provenance_info.json b/tests/hipscat_import/data/small_sky_object_catalog/provenance_info.json new file mode 100644 index 00000000..adfa1c96 --- /dev/null +++ b/tests/hipscat_import/data/small_sky_object_catalog/provenance_info.json @@ -0,0 +1,53 @@ +{ + "catalog_name": "small_sky_object_catalog", + "catalog_type": "object", + "version": "0.0.10.dev7+g0a79f90.d20230418", + "generation_date": "2023.04.20", + "epoch": "J2000", + "ra_kw": "ra", + "dec_kw": "dec", + "total_rows": 131, + "tool_args": { + "tool_name": "hipscat_import", + "version": "0.0.4.dev28+g2e31821.d20230420", + "runtime_args": { + "catalog_name": "small_sky_object_catalog", + "output_path": "/home/delucchi/git/association/tests/hipscat_import/data", + "output_catalog_name": "small_sky_object_catalog", + "tmp_dir": "", + "overwrite": true, + "dask_tmp": "/tmp/pytest-of-delucchi/pytest-1261/test_dask_runner0", + "dask_n_workers": 1, + "dask_threads_per_worker": 1, + "catalog_path": "/home/delucchi/git/association/tests/hipscat_import/data/small_sky_object_catalog", + "tmp_path": "/tmp/pytest-of-delucchi/pytest-1261/test_dask_runner0/small_sky_object_catalog/intermediate", + "epoch": "J2000", + "catalog_type": "object", + "input_path": "/home/delucchi/git/association/tests/hipscat_import/data/small_sky_parts", + "input_paths": [ + "/home/delucchi/git/association/tests/hipscat_import/data/small_sky_parts/catalog_00_of_05.csv", + "/home/delucchi/git/association/tests/hipscat_import/data/small_sky_parts/catalog_01_of_05.csv", + "/home/delucchi/git/association/tests/hipscat_import/data/small_sky_parts/catalog_02_of_05.csv", + "/home/delucchi/git/association/tests/hipscat_import/data/small_sky_parts/catalog_03_of_05.csv", + "/home/delucchi/git/association/tests/hipscat_import/data/small_sky_parts/catalog_04_of_05.csv" + ], + "input_format": "csv", + "input_file_list": [], + "ra_column": "ra", + "dec_column": "dec", + "id_column": "id", + "highest_healpix_order": 1, + "pixel_threshold": 1000000, + "debug_stats_only": false, + "file_reader_info": { + "input_reader_type": "CsvReader", + "chunksize": 500000, + "header": "infer", + "schema_file": null, + "separator": ",", + "column_names": null, + "type_map": {} + } + } + } +} diff --git a/tests/hipscat_import/data/small_sky_source_catalog/Norder=0/Dir=0/Npix=4.parquet b/tests/hipscat_import/data/small_sky_source_catalog/Norder=0/Dir=0/Npix=4.parquet new file mode 100644 index 00000000..d5c65653 Binary files /dev/null and b/tests/hipscat_import/data/small_sky_source_catalog/Norder=0/Dir=0/Npix=4.parquet differ diff --git a/tests/hipscat_import/data/small_sky_source_catalog/Norder=1/Dir=0/Npix=47.parquet b/tests/hipscat_import/data/small_sky_source_catalog/Norder=1/Dir=0/Npix=47.parquet new file mode 100644 index 00000000..cb0d36f7 Binary files /dev/null and b/tests/hipscat_import/data/small_sky_source_catalog/Norder=1/Dir=0/Npix=47.parquet differ diff --git a/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=176.parquet b/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=176.parquet new file mode 100644 index 00000000..2c4bbe6d Binary files /dev/null and b/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=176.parquet differ diff --git a/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=177.parquet b/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=177.parquet new file mode 100644 index 00000000..00ad8b93 Binary files /dev/null and b/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=177.parquet differ diff --git a/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=178.parquet b/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=178.parquet new file mode 100644 index 00000000..6cd593a1 Binary files /dev/null and b/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=178.parquet differ diff --git a/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=179.parquet b/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=179.parquet new file mode 100644 index 00000000..07b1b065 Binary files /dev/null and b/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=179.parquet differ diff --git a/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=180.parquet b/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=180.parquet new file mode 100644 index 00000000..90b514cb Binary files /dev/null and b/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=180.parquet differ diff --git a/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=181.parquet b/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=181.parquet new file mode 100644 index 00000000..b17baf4b Binary files /dev/null and b/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=181.parquet differ diff --git a/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=182.parquet b/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=182.parquet new file mode 100644 index 00000000..48e81ff8 Binary files /dev/null and b/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=182.parquet differ diff --git a/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=183.parquet b/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=183.parquet new file mode 100644 index 00000000..13263678 Binary files /dev/null and b/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=183.parquet differ diff --git a/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=184.parquet b/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=184.parquet new file mode 100644 index 00000000..6f56792f Binary files /dev/null and b/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=184.parquet differ diff --git a/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=185.parquet b/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=185.parquet new file mode 100644 index 00000000..a96dfbef Binary files /dev/null and b/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=185.parquet differ diff --git a/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=186.parquet b/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=186.parquet new file mode 100644 index 00000000..3531aa05 Binary files /dev/null and b/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=186.parquet differ diff --git a/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=187.parquet b/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=187.parquet new file mode 100644 index 00000000..ec824363 Binary files /dev/null and b/tests/hipscat_import/data/small_sky_source_catalog/Norder=2/Dir=0/Npix=187.parquet differ diff --git a/tests/hipscat_import/data/small_sky_source_catalog/_common_metadata b/tests/hipscat_import/data/small_sky_source_catalog/_common_metadata new file mode 100644 index 00000000..982281c9 Binary files /dev/null and b/tests/hipscat_import/data/small_sky_source_catalog/_common_metadata differ diff --git a/tests/hipscat_import/data/small_sky_source_catalog/_metadata b/tests/hipscat_import/data/small_sky_source_catalog/_metadata new file mode 100644 index 00000000..bfd79a20 Binary files /dev/null and b/tests/hipscat_import/data/small_sky_source_catalog/_metadata differ diff --git a/tests/hipscat_import/data/small_sky_source_catalog/catalog_info.json b/tests/hipscat_import/data/small_sky_source_catalog/catalog_info.json new file mode 100644 index 00000000..810cb2fa --- /dev/null +++ b/tests/hipscat_import/data/small_sky_source_catalog/catalog_info.json @@ -0,0 +1,8 @@ +{ + "catalog_name": "small_sky_source_catalog", + "catalog_type": "source", + "epoch": "J2000", + "ra_kw": "source_ra", + "dec_kw": "source_dec", + "total_rows": 17161 +} diff --git a/tests/hipscat_import/data/small_sky_source_catalog/partition_info.csv b/tests/hipscat_import/data/small_sky_source_catalog/partition_info.csv new file mode 100644 index 00000000..7a5f4e9f --- /dev/null +++ b/tests/hipscat_import/data/small_sky_source_catalog/partition_info.csv @@ -0,0 +1,15 @@ +Norder,Dir,Npix,num_rows +0,0,4,50 +1,0,47,2395 +2,0,176,385 +2,0,177,1510 +2,0,178,1634 +2,0,179,1773 +2,0,180,655 +2,0,181,903 +2,0,182,1246 +2,0,183,1143 +2,0,184,1390 +2,0,185,2942 +2,0,186,452 +2,0,187,683 diff --git a/tests/hipscat_import/data/small_sky_source_catalog/point_map.fits b/tests/hipscat_import/data/small_sky_source_catalog/point_map.fits new file mode 100644 index 00000000..e0ac82b9 Binary files /dev/null and b/tests/hipscat_import/data/small_sky_source_catalog/point_map.fits differ diff --git a/tests/hipscat_import/data/small_sky_source_catalog/provenance_info.json b/tests/hipscat_import/data/small_sky_source_catalog/provenance_info.json new file mode 100644 index 00000000..c7711184 --- /dev/null +++ b/tests/hipscat_import/data/small_sky_source_catalog/provenance_info.json @@ -0,0 +1,49 @@ +{ + "catalog_name": "small_sky_source_catalog", + "catalog_type": "source", + "version": "0.0.10.dev7+g0a79f90.d20230418", + "generation_date": "2023.04.20", + "epoch": "J2000", + "ra_kw": "source_ra", + "dec_kw": "source_dec", + "total_rows": 17161, + "tool_args": { + "tool_name": "hipscat_import", + "version": "0.0.4.dev28+g2e31821.d20230420", + "runtime_args": { + "catalog_name": "small_sky_source_catalog", + "output_path": "/home/delucchi/git/association/tests/hipscat_import/data", + "output_catalog_name": "small_sky_source_catalog", + "tmp_dir": "", + "overwrite": true, + "dask_tmp": "/tmp/pytest-of-delucchi/pytest-1261/test_dask_runner_source_table0", + "dask_n_workers": 1, + "dask_threads_per_worker": 1, + "catalog_path": "/home/delucchi/git/association/tests/hipscat_import/data/small_sky_source_catalog", + "tmp_path": "/tmp/pytest-of-delucchi/pytest-1261/test_dask_runner_source_table0/small_sky_source_catalog/intermediate", + "epoch": "J2000", + "catalog_type": "source", + "input_path": "/home/delucchi/git/association/tests/hipscat_import/data/small_sky_source", + "input_paths": [ + "/home/delucchi/git/association/tests/hipscat_import/data/small_sky_source/small_sky_source.csv" + ], + "input_format": "csv", + "input_file_list": [], + "ra_column": "source_ra", + "dec_column": "source_dec", + "id_column": "source_id", + "highest_healpix_order": 2, + "pixel_threshold": 3000, + "debug_stats_only": false, + "file_reader_info": { + "input_reader_type": "CsvReader", + "chunksize": 500000, + "header": "infer", + "schema_file": null, + "separator": ",", + "column_names": null, + "type_map": {} + } + } + } +}