Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor implementation for (skipping) validation #2

35 changes: 19 additions & 16 deletions pyam/_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
KNOWN_FUNCS,
to_list,
)
from pyam._compare import _compare


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -70,23 +72,24 @@ def _aggregate_recursive(df, variable, recursive):
components = compress(_df.variable, find_depth(_df.variable, level=d + 1))
var_list = set([reduce_hierarchy(v, -1) for v in components])

# collect already existing variables
intermediate_var = var_list.intersection(set(_df.variable))

# a temporary dataframe allows to distinguish between full data and new data
temp_df = _df.aggregate(variable=var_list)
# if intermediate variables exist, delete already existing entries in _data
if intermediate_var:
# index which doesn't exist in _df-index yet
_index = temp_df._data.index.difference(_df._data.index)
temp_df._data = temp_df._data[_index]
_df.append(temp_df, inplace=True)
# check consistency of intermediate variable
if not recursive == "skip-validate":
if intermediate_var:
if _df.check_aggregate(intermediate_var):
raise ValueError("Aggregated data is inconsistent.")
data_list.append(temp_df._data)
_data_agg = _aggregate(_df, variable=var_list)

# check if data for intermediate variables already exists
_data_self = _df.filter(variable=var_list)._data
_overlap = _data_agg.index.intersection(_data_self.index)
_new = _data_agg.index.difference(_data_self.index)

# assert that aggregated values are consistent with existing data (optional)
if recursive != "skip-validate" and not _overlap.empty:
conflict = _compare(_data_self, _data_agg[_overlap], "self", "aggregate")
if not conflict.empty:
msg = "Aggregated values are inconsistent with existing data:"
raise ValueError(f"{msg}\n{conflict}")

# append aggregated values that are not already in data
_df.append(_data_agg[_new], inplace=True)
data_list.append(_data_agg[_new])

return pd.concat(data_list)

Expand Down
22 changes: 22 additions & 0 deletions pyam/_compare.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import numpy as np
import pandas as pd


def _compare(
left, right, left_label="left", right_label="right", drop_close=True, **kwargs
):
"""Internal implementation of comparison of IamDataFrames or pd.Series"""

def as_series(s):
return s if isinstance(s, pd.Series) else s._data

ret = pd.merge(
left=as_series(left).rename(index=left_label),
right=as_series(right).rename(index=right_label),
how="outer",
left_index=True,
right_index=True,
)
if drop_close:
ret = ret[~np.isclose(ret[left_label], ret[right_label], **kwargs)]
return ret
15 changes: 3 additions & 12 deletions pyam/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
)
from pyam.read_ixmp import read_ix
from pyam.plotting import PlotAccessor, mpl_args_to_meta_cols
from pyam._compare import _compare
from pyam._aggregate import (
_aggregate,
_aggregate_region,
Expand Down Expand Up @@ -686,7 +687,7 @@ def swap_time_for_year(self, inplace=False):

# assign data and other attributes
ret._LONG_IDX = _index
ret._data = _data.set_index(ret._LONG_IDX)
ret._data = _data.set_index(ret._LONG_IDX).value
ret.time_col = "year"
ret._set_attributes()
delattr(ret, "time")
Expand Down Expand Up @@ -2516,17 +2517,7 @@ def compare(
kwargs : arguments for comparison of values
passed to :func:`numpy.isclose`
"""
ret = pd.concat(
{
right_label: right.data.set_index(right._LONG_IDX),
left_label: left.data.set_index(left._LONG_IDX),
},
axis=1,
)
ret.columns = ret.columns.droplevel(1)
if drop_close:
ret = ret[~np.isclose(ret[left_label], ret[right_label], **kwargs)]
return ret[[right_label, left_label]]
return _compare(left, right, left_label, right_label, drop_close=True, **kwargs)


def concat(dfs, ignore_meta_conflict=False, **kwargs):
Expand Down
30 changes: 30 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,18 @@
)


RECURSIVE_DF = pd.DataFrame(
[
["Secondary Energy|Electricity", "EJ/yr", 5, 19.0],
["Secondary Energy|Electricity|Wind", "EJ/yr", 5, 17],
["Secondary Energy|Electricity|Wind|Offshore", "EJ/yr", 1, 5],
["Secondary Energy|Electricity|Wind|Onshore", "EJ/yr", 4, 12],
["Secondary Energy|Electricity|Solar", "EJ/yr", np.nan, 2],
],
columns=["variable", "unit"] + TEST_YEARS,
)


TEST_STACKPLOT_DF = pd.DataFrame(
[
["World", "Emissions|CO2|Energy|Oil", "Mt CO2/yr", 2, 3.2, 2.0, 1.8],
Expand Down Expand Up @@ -210,6 +222,24 @@ def plot_df():
yield df


# IamDataFrame with two scenarios and structure for recursive aggregation
@pytest.fixture(scope="function", params=["year", "datetime"])
def recursive_df(request):

data = (
RECURSIVE_DF
if request.param == "year"
else RECURSIVE_DF.rename(DTS_MAPPING, axis="columns")
)

df = IamDataFrame(data, model="model_a", scenario="scen_a", region="World")
df2 = df.rename(scenario={"scen_a": "scen_b"})
df2._data *= 2
df.append(df2, inplace=True)

yield df


@pytest.fixture(scope="session")
def plot_stackplot_df():
df = IamDataFrame(TEST_STACKPLOT_DF)
Expand Down
60 changes: 19 additions & 41 deletions tests/test_feature_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,6 @@
columns=LONG_IDX + ["value"],
)

RECURSIVE_DF = pd.DataFrame(
[
["Secondary Energy|Electricity", "EJ/yr", 5, 19.0],
["Secondary Energy|Electricity|Wind", "EJ/yr", 5, 17],
["Secondary Energy|Electricity|Wind|Offshore", "EJ/yr", 1, 5],
["Secondary Energy|Electricity|Wind|Onshore", "EJ/yr", 4, 12],
["Secondary Energy|Electricity|Solar", "EJ/yr", np.nan, 2],
],
columns=["variable", "unit"] + TEST_YEARS,
)


@pytest.mark.parametrize(
"variable,data",
Expand Down Expand Up @@ -133,54 +122,43 @@ def test_aggregate_by_list_with_components_raises(simple_df):
pytest.raises(ValueError, simple_df.aggregate, v, components=components)


@pytest.mark.parametrize("time_col", (("year"), ("time")))
def test_aggregate_recursive(time_col):
def test_aggregate_recursive(recursive_df):
# use the feature `recursive=True`
data = (
RECURSIVE_DF
if time_col == "year"
else RECURSIVE_DF.rename(DTS_MAPPING, axis="columns")
)
df = IamDataFrame(data, model="model_a", scenario="scen_a", region="World")
df2 = df.rename(scenario={"scen_a": "scen_b"})
df2._data *= 2
df.append(df2, inplace=True)

# create object without variables to be aggregated
v = "Secondary Energy|Electricity"
agg_vars = [f"{v}{i}" for i in ["", "|Wind"]]
df_minimal = df.filter(variable=agg_vars, keep=False)
df_minimal = recursive_df.filter(variable=agg_vars, keep=False)

# return recursively aggregated data as new object
obs = df_minimal.aggregate(variable=v, recursive=True)
assert_iamframe_equal(obs, df.filter(variable=agg_vars))
assert_iamframe_equal(obs, recursive_df.filter(variable=agg_vars))

# append to `self`
df_minimal.aggregate(variable=v, recursive=True, append=True)
assert_iamframe_equal(df_minimal, df)
assert_iamframe_equal(df_minimal, recursive_df)


@pytest.mark.parametrize("time_col", (("year"), ("time")))
def test_aggregate_skip_intermediate(time_col):
# use the feature `recursive=skip-validate`
data = (
RECURSIVE_DF
if time_col == "year"
else RECURSIVE_DF.rename(DTS_MAPPING, axis="columns")
)
df = IamDataFrame(data, model="model_a", scenario="scen_a", region="World")
df2 = df.rename(scenario={"scen_a": "scen_b"})
df2._data *= 2
df.append(df2, inplace=True)
def test_aggregate_skip_intermediate(recursive_df):
# make the data inconsistent, check (and then skip) validation

recursive_df._data.iloc[0] = recursive_df._data.iloc[0] + 2
recursive_df._data.iloc[3] = recursive_df._data.iloc[3] + 2

# create object without variables to be aggregated, but with intermediate variables
v = "Secondary Energy|Electricity"
agg_vars = [f"{v}{i}" for i in [""]]
df_minimal = df.filter(variable=agg_vars, scenario="scen_a", keep=False)
df_minimal = recursive_df.filter(variable=v, scenario="scen_a", keep=False)
agg_vars = [f"{v}{i}" for i in ["", "|Wind"]]
df_minimal.filter(variable=agg_vars, scenario="scen_b", keep=False, inplace=True)

# append to `self`
# simply calling recursive aggregation raises an error
match = "Aggregated values are inconsistent with existing data:"
with pytest.raises(ValueError, match=match):
df_minimal.aggregate(variable=v, recursive=True, append=True)

# append to `self` with skipping validation
df_minimal.aggregate(variable=v, recursive="skip-validate", append=True)
assert_iamframe_equal(df_minimal, df)
assert_iamframe_equal(df_minimal, recursive_df)


@pytest.mark.parametrize(
Expand Down
12 changes: 6 additions & 6 deletions tests/test_feature_compare.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ def test_compare(test_df):
clone._data.iloc[0] = 2
clone.rename(variable={"Primary Energy|Coal": "Primary Energy|Gas"}, inplace=True)

obs = compare(test_df, clone, right_label="test_df", left_label="clone")
obs = compare(test_df, clone, left_label="test_df", right_label="clone")

exp = pd.DataFrame(
[
["Primary Energy", "EJ/yr", dt.datetime(2005, 6, 17), 2, 1],
["Primary Energy|Coal", "EJ/yr", dt.datetime(2005, 6, 17), np.nan, 0.5],
["Primary Energy|Coal", "EJ/yr", dt.datetime(2010, 7, 21), np.nan, 3],
["Primary Energy|Gas", "EJ/yr", dt.datetime(2005, 6, 17), 0.5, np.nan],
["Primary Energy|Gas", "EJ/yr", dt.datetime(2010, 7, 21), 3, np.nan],
["Primary Energy", "EJ/yr", dt.datetime(2005, 6, 17), 1, 2],
["Primary Energy|Coal", "EJ/yr", dt.datetime(2005, 6, 17), 0.5, np.nan],
["Primary Energy|Coal", "EJ/yr", dt.datetime(2010, 7, 21), 3, np.nan],
["Primary Energy|Gas", "EJ/yr", dt.datetime(2005, 6, 17), np.nan, 0.5],
["Primary Energy|Gas", "EJ/yr", dt.datetime(2010, 7, 21), np.nan, 3],
],
columns=["variable", "unit", "time", "test_df", "clone"],
)
Expand Down