Skip to content

Commit

Permalink
Merge pull request #344 from os-climate/vault-pytest
Browse files Browse the repository at this point in the history
Major re-work of Data Vault code to better support pytest unit testing.  Bumps to version 1.1.1, which ITR-examples will have to follow.
  • Loading branch information
MichaelTiemannOSC authored Dec 14, 2023
2 parents 8623104 + f51215a commit 2d53417
Show file tree
Hide file tree
Showing 13 changed files with 1,624 additions and 895 deletions.
179 changes: 178 additions & 1 deletion pdm.lock

Large diffs are not rendered by default.

57 changes: 30 additions & 27 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "ITR"
version = "v1.0.11"
version = "v1.1.1"
description = "Assess the temperature alignment of current targets, commitments, and investment and lending portfolios."
authors = [
{ name = "Michael Tiemann", email = "72577720+MichaelTiemannOSC@users.noreply.github.com" },
Expand Down Expand Up @@ -29,32 +29,35 @@ classifiers = [
]

dependencies = [
"autoapi>=2.0.1",
"iam-units>=2022.10.27",
"numpy==1.24.3",
"openpyxl==3.0.10",
"openscm-units==0.5.2",
"orca==1.8",
"osc-ingest-tools>=0.5.2",
"pandas>=2.1.0",
"pip>=23.3.1",
"Pint>=0.22",
"Pint-Pandas>=0.5",
"psutil==5.9.5",
"pydantic>=2.3.0",
"pygithub==1.55",
"pytest==7.3.2",
"python-dotenv==1.0.0",
"setuptools>=65.7.0",
"sphinx<8,>=6",
"sphinx-autoapi>=2.0.1",
"sphinx-autodoc-typehints",
"sphinx-rtd-theme==1.3.0",
"SQLAlchemy>=2.0.20",
"tables>=3.8.0",
"trino==0.326.0",
"wheel>=0.41.0",
"xlrd==2.0.1",
"autoapi>=2.0.1",
"fastparquet>=2023.10.1",
"iam-units>=2022.10.27",
"numpy==1.24.3",
"openpyxl==3.0.10",
"openscm-units==0.5.2",
"orca==1.8",
"osc-ingest-tools>=0.5.2",
"pandas>=2.1.0",
"pip>=23.3.1",
"Pint>=0.23",
"Pint-Pandas>=0.5",
"psutil==5.9.5",
"pydantic>=2.3.0",
"pygithub==1.55",
"pytest==7.3.2",
"python-dotenv==1.0.0",
"setuptools>=65.7.0",
"sphinx<8,>=6",
"sphinx-autoapi>=2.0.1",
"sphinx-autodoc-typehints",
"sphinx-rtd-theme==1.3.0",
"SQLAlchemy>=2.0.20",
"tables>=3.8.0",
"trino==0.326.0",
"wheel>=0.41.0",
"xlrd==2.0.1",
"mypy-boto3-s3>=1.33.2",
"boto3-stubs-lite>=1.33.13",
]

[project.urls]
Expand Down
5 changes: 0 additions & 5 deletions src/ITR/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@ class ColumnsConfig:
BASE_YEAR_PRODUCTION = "base_year_production"
GHG_SCOPE12 = "ghg_s1s2"
GHG_SCOPE3 = "ghg_s3"
TEMPLATE_SCOPE1 = "em_s1"
TEMPLATE_SCOPE2 = "em_s2"
TEMPLATE_SCOPE12 = "em_s1s2"
TEMPLATE_SCOPE3 = "em_s3"
TEMPLATE_SCOPE123 = "em_s1s2s3"
HISTORIC_DATA = "historic_data"
TARGET_DATA = "target_data"
TEMPLATE_PRODUCTION = "production"
Expand Down
141 changes: 56 additions & 85 deletions src/ITR/data/base_providers.py

Large diffs are not rendered by default.

46 changes: 33 additions & 13 deletions src/ITR/data/data_providers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import List, Optional
from typing import List, Optional, Type

import pandas as pd

Expand Down Expand Up @@ -37,6 +37,22 @@ def __init__(self, **kwargs):
"""
pass

@property
@abstractmethod
def column_config(self) -> Type[ColumnsConfig]:
"""
Return the ColumnsConfig associated with this Data Provider
"""
raise NotImplementedError

@property
@abstractmethod
def own_data(self) -> bool:
"""
Return True if this object contains its own data; false if data housed elsewhere
"""
raise NotImplementedError

@abstractmethod
def get_projection_controls(self) -> ProjectionControls:
"""
Expand Down Expand Up @@ -150,7 +166,14 @@ def __init__(self, **kwargs):
:param config: A dictionary containing the configuration parameters for this data provider.
"""
pass
self._own_data = False

@property
def own_data(self) -> bool:
"""
:return: True if this object contains its own data; false if data housed elsewhere
"""
return self._own_data

@abstractmethod
def benchmark_changed(self, production_benchmark: ProductionBenchmarkDataProvider) -> bool:
Expand All @@ -166,17 +189,6 @@ def get_company_projected_production(self, ghg_scope12: pd.DataFrame) -> pd.Data
"""
raise NotImplementedError

@abstractmethod
def get_benchmark_projections(self, company_secor_region_info: pd.DataFrame) -> pd.DataFrame:
"""
get the sector emissions for a list of companies.
If there is no data for the sector, then it will be replaced by the global value
:param company_secor_region_info: DataFrame with at least the following columns :
ColumnsConfig.COMPANY_ID, ColumnsConfig.SECTOR and ColumnsConfig.REGION
:return: A DataFrame with company and intensity benchmarks per calendar year per row
"""
raise NotImplementedError


class IntensityBenchmarkDataProvider(ABC):
"""
Expand All @@ -203,6 +215,14 @@ def __init__(
self._benchmark_temperature = benchmark_temperature
self._is_AFOLU_included = is_AFOLU_included
self._benchmark_global_budget = benchmark_global_budget
self._own_data = False

@property
def own_data(self) -> bool:
"""
:return: True if this object contains its own data; false if data housed elsewhere
"""
return self._own_data

@abstractmethod
def get_scopes(self) -> List[EScope]:
Expand Down
115 changes: 71 additions & 44 deletions src/ITR/data/data_warehouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ def __init__(
benchmark_projected_production: Optional[ProductionBenchmarkDataProvider],
benchmarks_projected_ei: Optional[IntensityBenchmarkDataProvider],
estimate_missing_data: Optional[Callable[["DataWarehouse", ICompanyData], None]] = None,
column_config: Type[ColumnsConfig] = ColumnsConfig,
):
"""
Create a new data warehouse instance.
Expand All @@ -65,7 +64,6 @@ def __init__(
# benchmarks_projected_ei._EI_df_t is the (transposed) EI dataframe for the benchmark
# benchmark_projected_production.get_company_projected_production(company_sector_region_scope) gives production data per company (per year)
# multiplying these two gives aligned emissions data for the company, in case we want to add missing data based on sector averages
self.column_config = column_config
self.company_data = company_data
self.estimate_missing_data = estimate_missing_data
# Place to stash historic data before doing PC-conversion so it can be retreived when switching to non-PC benchmarks
Expand All @@ -76,8 +74,17 @@ def __init__(
# Trajectories + Emissions Intensities benchmark data are needed to estimate missing S3 data
# Target projections rely both on Production benchmark data and S3 estimated data
# Production-centric manipulations must happen after targets have been projected
if benchmark_projected_production is not None or benchmarks_projected_ei is not None:
if (benchmark_projected_production is not None and benchmark_projected_production.own_data) or (
benchmarks_projected_ei is not None and benchmarks_projected_ei.own_data
):
self.update_benchmarks(benchmark_projected_production, benchmarks_projected_ei)
self._own_data = True
else:
self._own_data = False

@property
def own_data(self) -> bool:
return self._own_data

def _preserve_historic_data(self):
for c in self.company_data._companies:
Expand Down Expand Up @@ -137,19 +144,31 @@ def update_benchmarks(
Update the benchmark data used in this instance of the DataWarehouse. If there is no change, do nothing.
"""
new_production_bm = new_ei_bm = new_prod_centric = False
if self.benchmark_projected_production is None or self.benchmark_projected_production.benchmark_changed(
benchmark_projected_production
if benchmark_projected_production is None:
pass
if self.benchmark_projected_production is None or (
self.benchmark_projected_production.own_data
and self.benchmark_projected_production.benchmark_changed(benchmark_projected_production)
):
self.benchmark_projected_production = benchmark_projected_production # type: ignore
new_production_bm = True

if self.benchmarks_projected_ei is None:
if benchmarks_projected_ei is None:
pass
elif self.benchmarks_projected_ei is None:
self.benchmarks_projected_ei = benchmarks_projected_ei # type: ignore
new_ei_bm = True
elif self.benchmarks_projected_ei.benchmarks_changed(benchmarks_projected_ei):
new_prod_centric = self.benchmarks_projected_ei.prod_centric_changed(benchmarks_projected_ei)
if benchmarks_projected_ei.own_data:
new_ei_bm = True
elif self.benchmarks_projected_ei.own_data:
if benchmarks_projected_ei.own_data:
if self.benchmarks_projected_ei.benchmarks_changed(benchmarks_projected_ei):
new_prod_centric = self.benchmarks_projected_ei.prod_centric_changed(benchmarks_projected_ei)
new_ei_bm = True
self.benchmarks_projected_ei = benchmarks_projected_ei
new_ei_bm = True

if not new_production_bm and not new_ei_bm:
return

assert self.benchmarks_projected_ei is not None

# Production benchmark data is needed to project trajectories
Expand All @@ -174,7 +193,7 @@ def update_benchmarks(

# If we are missing S3 (or other) data, fill in before projecting targets
if new_ei_bm and self.estimate_missing_data is not None:
logger.info(f"estimating missing data")
logger.info("estimating missing data")
for c in self.company_data.get_company_data():
self.estimate_missing_data(self, c)

Expand All @@ -190,7 +209,7 @@ def update_benchmarks(
# If our benchmark is production-centric, migrate S3 data (including estimated S3 data) into S1S2
# If we shift before we project, then S3 targets will not be projected correctly.
if new_ei_bm and benchmarks_projected_ei.is_production_centric():
logger.info(f"Shifting S3 emissions data into S1 according to Production-Centric benchmark rules")
logger.info("Shifting S3 emissions data into S1 according to Production-Centric benchmark rules")
if self.orig_historic_data != {}:
self._restore_historic_data()
else:
Expand All @@ -201,7 +220,7 @@ def update_benchmarks(
if not ITR.isna(c.ghg_s3):
c.ghg_s1s2 = c.ghg_s1s2 + c.ghg_s3
c.ghg_s3 = None # Q_(0.0, c.ghg_s3.u)
if not c.historic_data.empty():
if not c.historic_data.empty:

def _adjust_historic_data(data, primary_scope_attr, data_adder):
if data[primary_scope_attr]:
Expand Down Expand Up @@ -365,7 +384,9 @@ def update_trajectories(self):
# We cannot only update trajectories without regard for all that depend on those trajectories
# For example, different benchmarks may have different scopes defined, units for benchmarks, etc.
logger.info(
f"re-calculating trajectories for {len(self.company_data._companies)} companies\n (times {len(EScope.get_scopes())} scopes times {self.company_data.projection_controls.TARGET_YEAR-self.company_data.projection_controls.BASE_YEAR} years)"
f"re-calculating trajectories for {len(self.company_data._companies)} companies"
f"\n (times {len(EScope.get_scopes())} scopes times "
f"{self.company_data.projection_controls.TARGET_YEAR-self.company_data.projection_controls.BASE_YEAR} years)"
)
for company in self.company_data._companies:
company.projected_intensities = None
Expand Down Expand Up @@ -526,9 +547,12 @@ def _process_company_data(
df_budget = DataWarehouse._get_cumulative_emissions(
projected_ei=budgeted_ei, projected_production=projected_production
)
base_year_scale = df_trajectory.loc[df_budget.index][base_year].mul(
df_budget[base_year].map(lambda x: Q_(0.0, f"1/({x.u})") if x.m == 0.0 else 1 / x)
)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
# Quieting warnings due to https://github.com/hgrecco/pint/issues/1897
base_year_scale = df_trajectory.loc[df_budget.index][base_year].mul(
df_budget[base_year].map(lambda x: Q_(0.0, f"1/({x.u})") if x.m == 0.0 else 1 / x)
)
df_scaled_budget = df_budget.mul(base_year_scale, axis=0)
# FIXME: we calculate exceedance only against df_budget, not also df_scaled_budget
# df_trajectory_exceedance = self._get_exceedance_year(df_trajectory, df_budget, self.company_data.projection_controls.TARGET_YEAR, None)
Expand Down Expand Up @@ -577,7 +601,7 @@ def get_preprocessed_company_data(self, company_ids: List[str]) -> List[ICompany

company_data = self.company_data.get_company_data(company_ids)
df_company_data = pd.DataFrame.from_records([dict(c) for c in company_data]).set_index(
self.column_config.COMPANY_ID, drop=False
self.company_data.column_config.COMPANY_ID, drop=False
)
valid_company_ids = df_company_data.index.to_list()

Expand Down Expand Up @@ -649,7 +673,7 @@ def _convert_df_to_model(self, df_company_data: pd.DataFrame) -> List[ICompanyAg
except ValidationError:
logger.warning(
"(one of) the input(s) of company %s is invalid and will be skipped"
% company_data[self.column_config.COMPANY_NAME]
% company_data[self.company_data.column_config.COMPANY_NAME]
)
pass
return model_companies
Expand All @@ -671,31 +695,34 @@ def _get_cumulative_emissions(cls, projected_ei: pd.DataFrame, projected_product

# Ensure that projected_production is ordered the same as projected_ei, preserving order of projected_ei
# projected_production is constructed to be limited to the years we want to analyze
proj_prod_t = asPintDataFrame(projected_production.loc[projected_ei.index].T)
# Limit projected_ei to the year range of projected_production
proj_ei_t = asPintDataFrame(projected_ei[proj_prod_t.index].T)
units_CO2e = "t CO2e"
# We use pd.concat because pd.combine reverts our PintArrays into object arrays :-/
proj_CO2e_m_t = pd.concat(
[
ITR.data.osc_units.align_production_to_bm(proj_prod_t[col], proj_ei_t[col])
.mul(proj_ei_t[col])
.pint.m_as(units_CO2e)
for col in proj_ei_t.columns
],
axis=1,
)
# pd.concat names parameter refers to index.names; there's no way to set columns.names
proj_CO2e_m_t.columns.names = proj_ei_t.columns.names
if ITR.HAS_UNCERTAINTIES:
# Sum both the nominal and std_dev values, because these series are completely correlated
# Note that NaNs in this dataframe will be nan+/-nan, showing up in both nom and err
nom_CO2e_m_t = proj_CO2e_m_t.apply(ITR.nominal_values).cumsum()
err_CO2e_m_t = proj_CO2e_m_t.apply(ITR.std_devs).cumsum()
cumulative_emissions_m_t = nom_CO2e_m_t.combine(err_CO2e_m_t, ITR.recombine_nom_and_std)
else:
cumulative_emissions_m_t = proj_CO2e_m_t.cumsum()
return cumulative_emissions_m_t.T.astype(f"pint[{units_CO2e}]")
with warnings.catch_warnings():
warnings.simplefilter("ignore")
# Quieting warnings due to https://github.com/hgrecco/pint/issues/1897
proj_prod_t = asPintDataFrame(projected_production.loc[projected_ei.index].T)
# Limit projected_ei to the year range of projected_production
proj_ei_t = asPintDataFrame(projected_ei[proj_prod_t.index].T)
units_CO2e = "t CO2e"
# We use pd.concat because pd.combine reverts our PintArrays into object arrays :-/
proj_CO2e_m_t = pd.concat(
[
ITR.data.osc_units.align_production_to_bm(proj_prod_t[col], proj_ei_t[col])
.mul(proj_ei_t[col])
.pint.m_as(units_CO2e)
for col in proj_ei_t.columns
],
axis=1,
)
# pd.concat names parameter refers to index.names; there's no way to set columns.names
proj_CO2e_m_t.columns.names = proj_ei_t.columns.names
if ITR.HAS_UNCERTAINTIES:
# Sum both the nominal and std_dev values, because these series are completely correlated
# Note that NaNs in this dataframe will be nan+/-nan, showing up in both nom and err
nom_CO2e_m_t = proj_CO2e_m_t.apply(ITR.nominal_values).cumsum()
err_CO2e_m_t = proj_CO2e_m_t.apply(ITR.std_devs).cumsum()
cumulative_emissions_m_t = nom_CO2e_m_t.combine(err_CO2e_m_t, ITR.recombine_nom_and_std)
else:
cumulative_emissions_m_t = proj_CO2e_m_t.cumsum()
return cumulative_emissions_m_t.T.astype(f"pint[{units_CO2e}]")

@classmethod
def _get_exceedance_year(
Expand Down
Loading

0 comments on commit 2d53417

Please sign in to comment.