Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implementation of association pipeline. #69

Merged
merged 5 commits into from
Apr 21, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions src/hipscat_import/association/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ class AssociationArguments(RuntimeArguments):
join_id_column: str = ""
join_foreign_key: str = ""

join_how: str = "inner"
compute_partition_size: int = 1_000_000_000

def __post_init__(self):
Expand All @@ -49,9 +48,6 @@ def _check_arguments(self):
if self.join_id_column in ["primary_id", "join_id"]:
raise ValueError("join_id_column uses a reserved column name")

if self.join_how not in ["left", "right", "outer", "inner"]:
raise ValueError("join_how must be one of left, right, outer, or inner")

if self.compute_partition_size < 100_000:
raise ValueError("compute_partition_size must be at least 100_000")

Expand Down
28 changes: 12 additions & 16 deletions src/hipscat_import/association/map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ def map_association(args):
## NB: We may be joining on a column that is NOT the natural primary key.
single_primary_column = args.primary_id_column == args.primary_join_column
read_columns = [
"_hipscat_index",
"Norder",
"Dir",
"Npix",
Expand All @@ -48,14 +47,15 @@ def map_association(args):
}
if not single_primary_column:
rename_columns[args.primary_id_column] = "primary_id"
primary_index = primary_index.rename(columns=rename_columns).set_index(
"primary_join"
primary_index = (
primary_index.reset_index()
.rename(columns=rename_columns)
.set_index("primary_join")
)

## Read and massage join input data
single_join_column = args.join_id_column == args.join_foreign_key
read_columns = [
"_hipscat_index",
delucchi-cmu marked this conversation as resolved.
Show resolved Hide resolved
"Norder",
"Dir",
"Npix",
Expand All @@ -82,27 +82,23 @@ def map_association(args):
}
if not single_join_column:
rename_columns[args.join_id_column] = "join_id"
join_index = join_index.rename(columns=rename_columns).set_index("join_to_primary")
join_index = (
join_index.reset_index()
.rename(columns=rename_columns)
.set_index("join_to_primary")
)

## Join the two data sets on the shared join predicate.
join_data = primary_index.merge(
join_index, how=args.join_how, left_index=True, right_index=True
).reset_index()
join_index, how="inner", left_index=True, right_index=True
delucchi-cmu marked this conversation as resolved.
Show resolved Hide resolved
)

## Write out a summary of each partition join
groups = (
join_data.groupby(
["Norder", "Dir", "Npix", "join_Norder", "join_Dir", "join_Npix"],
group_keys=False,
)[
"Norder",
"Dir",
"Npix",
"join_Norder",
"join_Dir",
"join_Npix",
"primary_hipscat_index",
]
)["primary_hipscat_index"]
.count()
.compute()
)
Expand Down
31 changes: 0 additions & 31 deletions tests/hipscat_import/association/test_association_argument.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,20 +120,6 @@ def test_empty_required(tmp_path, small_sky_object_catalog):
overwrite=True,
)

with pytest.raises(ValueError, match="join_how"):
AssociationArguments(
primary_input_catalog_path=small_sky_object_catalog,
primary_id_column="id",
primary_join_column="id",
join_input_catalog_path=small_sky_object_catalog,
join_id_column="id",
join_foreign_key="id",
output_path=tmp_path,
output_catalog_name="small_sky_self_join",
join_how="", ## empty
overwrite=True,
)


def test_column_names(tmp_path, small_sky_object_catalog):
"""Test validation of column names."""
Expand Down Expand Up @@ -164,23 +150,6 @@ def test_column_names(tmp_path, small_sky_object_catalog):
)


def test_join_how(tmp_path, small_sky_object_catalog):
"""Test validation of join how."""
with pytest.raises(ValueError, match="join_how"):
AssociationArguments(
primary_input_catalog_path=small_sky_object_catalog,
primary_id_column="id",
primary_join_column="id",
join_input_catalog_path=small_sky_object_catalog,
join_id_column="id",
join_foreign_key="id",
output_path=tmp_path,
output_catalog_name="bad_columns",
join_how="middle", ## not a valid join option
overwrite=True,
)


def test_compute_partition_size(tmp_path, small_sky_object_catalog):
"""Test validation of compute_partition_size."""
with pytest.raises(ValueError, match="compute_partition_size"):
Expand Down
57 changes: 56 additions & 1 deletion tests/hipscat_import/association/test_association_map_reduce.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,66 @@
"""Test behavior of map reduce methods in association."""

import json
import os

import pandas as pd
import pytest

from hipscat_import.association.map_reduce import reduce_association
import hipscat_import.catalog.run_import as runner
from hipscat_import.association.arguments import AssociationArguments
from hipscat_import.association.map_reduce import (map_association,
reduce_association)
from hipscat_import.catalog.arguments import ImportArguments


@pytest.mark.filterwarnings("ignore::DeprecationWarning")
@pytest.mark.timeout(15)
def test_map_association(
dask_client, tmp_path, formats_headers_csv, small_sky_object_catalog
):
"""Test association with partially-overlapping dataset.

This has the added benefit of testing a freshly-minted catalog as input."""
args = ImportArguments(
output_catalog_name="subset_catalog",
input_file_list=[formats_headers_csv],
input_format="csv",
output_path=tmp_path,
ra_column="ra_mean",
dec_column="dec_mean",
id_column="object_id",
dask_tmp=tmp_path,
highest_healpix_order=1,
progress_bar=False,
)
subset_catalog_path = args.catalog_path

runner.run_with_client(args, dask_client)

with open(
os.path.join(subset_catalog_path, "catalog_info.json"), "r", encoding="utf-8"
) as metadata_info:
metadata_keywords = json.load(metadata_info)
assert metadata_keywords["total_rows"] == 8

args = AssociationArguments(
primary_input_catalog_path=small_sky_object_catalog,
primary_id_column="id",
primary_join_column="id",
join_input_catalog_path=subset_catalog_path,
output_catalog_name="association_inner",
join_foreign_key="object_id",
join_id_column="object_id",
output_path=tmp_path,
progress_bar=False,
)

map_association(args)
intermediate_partitions_file = os.path.join(
args.catalog_path, "intermediate", "partitions.csv"
)
data_frame = pd.read_csv(intermediate_partitions_file)
assert data_frame["primary_hipscat_index"].sum() == 8


def test_reduce_bad_inputs(tmp_path, assert_text_file_matches):
Expand Down
2 changes: 2 additions & 0 deletions tests/hipscat_import/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def small_sky_single_file(test_data_dir):
def small_sky_object_catalog(test_data_dir):
return os.path.join(test_data_dir, "small_sky_object_catalog")


@pytest.fixture
def small_sky_source_dir(test_data_dir):
return os.path.join(test_data_dir, "small_sky_source")
Expand All @@ -49,6 +50,7 @@ def small_sky_source_dir(test_data_dir):
def small_sky_source_catalog(test_data_dir):
return os.path.join(test_data_dir, "small_sky_source_catalog")


@pytest.fixture
def blank_data_dir(test_data_dir):
return os.path.join(test_data_dir, "blank")
Expand Down
Binary file not shown.
Binary file modified tests/hipscat_import/data/small_sky_object_catalog/_common_metadata
Binary file not shown.
Binary file modified tests/hipscat_import/data/small_sky_object_catalog/_metadata
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,25 @@
"catalog_name": "small_sky_object_catalog",
"catalog_type": "object",
"version": "0.0.10.dev7+g0a79f90.d20230418",
"generation_date": "2023.04.18",
"generation_date": "2023.04.20",
"epoch": "J2000",
"ra_kw": "ra",
"dec_kw": "dec",
"total_rows": 131,
"tool_args": {
"tool_name": "hipscat_import",
"version": "0.0.4.dev17+gdbb1fdd",
"version": "0.0.4.dev28+g2e31821.d20230420",
"runtime_args": {
"catalog_name": "small_sky_object_catalog",
"output_path": "/home/delucchi/git/association/tests/hipscat_import/data",
"output_catalog_name": "small_sky_object_catalog",
"tmp_dir": "",
"overwrite": false,
"dask_tmp": "/tmp/pytest-of-delucchi/pytest-1117/test_dask_runner0",
"overwrite": true,
"dask_tmp": "/tmp/pytest-of-delucchi/pytest-1261/test_dask_runner0",
"dask_n_workers": 1,
"dask_threads_per_worker": 1,
"catalog_path": "/home/delucchi/git/association/tests/hipscat_import/data/small_sky_object_catalog",
"tmp_path": "/tmp/pytest-of-delucchi/pytest-1117/test_dask_runner0/small_sky_object_catalog/intermediate",
"tmp_path": "/tmp/pytest-of-delucchi/pytest-1261/test_dask_runner0/small_sky_object_catalog/intermediate",
"epoch": "J2000",
"catalog_type": "object",
"input_path": "/home/delucchi/git/association/tests/hipscat_import/data/small_sky_parts",
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file modified tests/hipscat_import/data/small_sky_source_catalog/_common_metadata
Binary file not shown.
Binary file modified tests/hipscat_import/data/small_sky_source_catalog/_metadata
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,25 @@
"catalog_name": "small_sky_source_catalog",
"catalog_type": "source",
"version": "0.0.10.dev7+g0a79f90.d20230418",
"generation_date": "2023.04.18",
"generation_date": "2023.04.20",
"epoch": "J2000",
"ra_kw": "source_ra",
"dec_kw": "source_dec",
"total_rows": 17161,
"tool_args": {
"tool_name": "hipscat_import",
"version": "0.0.4.dev17+gdbb1fdd",
"version": "0.0.4.dev28+g2e31821.d20230420",
"runtime_args": {
"catalog_name": "small_sky_source_catalog",
"output_path": "/home/delucchi/git/association/tests/hipscat_import/data",
"output_catalog_name": "small_sky_source_catalog",
"tmp_dir": "",
"overwrite": true,
"dask_tmp": "/tmp/pytest-of-delucchi/pytest-1119/test_dask_runner_source_table0",
"dask_tmp": "/tmp/pytest-of-delucchi/pytest-1261/test_dask_runner_source_table0",
"dask_n_workers": 1,
"dask_threads_per_worker": 1,
"catalog_path": "/home/delucchi/git/association/tests/hipscat_import/data/small_sky_source_catalog",
"tmp_path": "/tmp/pytest-of-delucchi/pytest-1119/test_dask_runner_source_table0/small_sky_source_catalog/intermediate",
"tmp_path": "/tmp/pytest-of-delucchi/pytest-1261/test_dask_runner_source_table0/small_sky_source_catalog/intermediate",
"epoch": "J2000",
"catalog_type": "source",
"input_path": "/home/delucchi/git/association/tests/hipscat_import/data/small_sky_source",
Expand Down