Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor process_wildcards and add support for TFM_MIG #166

Merged
merged 11 commits into from
Jan 31, 2024
271 changes: 141 additions & 130 deletions xl2times/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,6 @@ def process_units(
tables: Dict[str, DataFrame],
model: datatypes.TimesModel,
) -> Dict[str, DataFrame]:

units_map = {
"activity": model.processes["tact"].unique(),
"capacity": model.processes["tcap"].unique(),
Expand All @@ -767,7 +766,6 @@ def process_time_periods(
tables: List[datatypes.EmbeddedXlTable],
model: datatypes.TimesModel,
) -> List[datatypes.EmbeddedXlTable]:

model.start_year = utils.get_scalar(datatypes.Tag.start_year, tables)
active_pdef = utils.get_scalar(datatypes.Tag.active_p_def, tables)
df = utils.single_table(tables, datatypes.Tag.time_periods).dataframe.copy()
Expand Down Expand Up @@ -822,20 +820,19 @@ def complete_dictionary(
tables: Dict[str, DataFrame],
model: datatypes.TimesModel,
) -> Dict[str, DataFrame]:

for k, v in {
"AllRegions": model.all_regions,
"Regions": model.internal_regions,
"DataYears": model.data_years,
"PastYears": model.past_years,
"ModelYears": model.model_years,
}.items():
for k, v in [
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious as to why it is better to use a list tuples here instead of a dictionary. Should the dictionary below be converted as well?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this is because iterating through a set results in a non-deterministic order (see also #50 and #67). This makes it very difficult to debug regressions, because I rely on the --verbose flag and a diff tool to find out which transformation caused the regression. But with nondeterministic behaviour, there are too many changes in the diff..

I'm considering adding a check to CI that runs the tool on a small benchmark ~5-10 times and ensures that all intermediate tables are identical across runs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left the dictionary below unchanged because the loop only assigns to a dictionary, so the order doesn't matter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The benchmark idea sounds good!

("AllRegions", model.all_regions),
("Regions", model.internal_regions),
("DataYears", model.data_years),
("PastYears", model.past_years),
("ModelYears", model.model_years),
]:
if "region" in k.lower():
column_list = ["region"]
else:
column_list = ["year"]

tables[k] = pd.DataFrame(v, columns=column_list)
tables[k] = pd.DataFrame(sorted(v), columns=column_list)

# Dataframes
for k, v in {
Expand Down Expand Up @@ -1569,6 +1566,7 @@ def process_transform_insert(
datatypes.Tag.tfm_dins,
datatypes.Tag.tfm_topins,
datatypes.Tag.tfm_upd,
datatypes.Tag.tfm_mig,
datatypes.Tag.tfm_comgrp,
]

Expand All @@ -1582,12 +1580,18 @@ def process_transform_insert(
datatypes.Tag.tfm_ins,
datatypes.Tag.tfm_ins_txt,
datatypes.Tag.tfm_upd,
datatypes.Tag.tfm_mig,
datatypes.Tag.tfm_comgrp,
]:
df = table.dataframe.copy()

# Standardize column names
known_columns = config.known_columns[datatypes.Tag.tfm_ins] | query_columns
if table.tag == datatypes.Tag.tfm_mig:
# Also allow attribute2, year2 etc for TFM_MIG tables
known_columns.update(
(c + "2" for c in config.known_columns[datatypes.Tag.tfm_ins])
)
Comment on lines 1597 to +1602
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
known_columns = config.known_columns[datatypes.Tag.tfm_ins] | query_columns
if table.tag == datatypes.Tag.tfm_mig:
# Also allow attribute2, year2 etc for TFM_MIG tables
known_columns.update(
(c + "2" for c in config.known_columns[datatypes.Tag.tfm_ins])
)
known_columns = config.known_columns[datatypes.Tag(table.tag)] | query_columns

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this not be table specific?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, we could also expand the tags file with info on whether a specific column is a query column and generate a list of query columns from there.


# Handle Regions:
if set(df.columns).isdisjoint(
Expand Down Expand Up @@ -1851,126 +1855,133 @@ def process_wildcards(
tables: Dict[str, DataFrame],
model: datatypes.TimesModel,
) -> Dict[str, DataFrame]:
dictionary = generate_topology_dictionary(tables, model)

# TODO separate this code into expading wildcards and updating/inserting data!
for tag in [
datatypes.Tag.tfm_upd,
datatypes.Tag.tfm_ins,
datatypes.Tag.tfm_ins_txt,
]:
if tag in tables:
start_time = time.time()
upd = tables[tag]
new_rows = []
# reset index to make sure there are no duplicates
tables[datatypes.Tag.fi_t].reset_index(drop=True, inplace=True)
if tag == datatypes.Tag.tfm_upd:
# copy old index to new column 'index'
tables[datatypes.Tag.fi_t].reset_index(inplace=True)
for i in range(0, len(upd)):
row = upd.iloc[i]
debug = False
if debug:
print(row)
matching_processes = get_matching_processes(row, dictionary)
if matching_processes is not None and len(matching_processes) == 0:
print(f"WARNING: {tag} row matched no processes")
continue
matching_commodities = get_matching_commodities(row, dictionary)
if matching_commodities is not None and len(matching_commodities) == 0:
print(f"WARNING: {tag} row matched no commodities")
continue
df = tables[datatypes.Tag.fi_t]
if any(df.index.duplicated()):
raise ValueError("~FI_T table has duplicated indices")
if tag == datatypes.Tag.tfm_upd:
# construct query into ~FI_T to get indices of matching rows
if matching_processes is not None:
df = df.merge(matching_processes, on="process")
if debug:
print(f"{len(df)} rows after processes")
if any(df["index"].duplicated()):
raise ValueError("~FI_T table has duplicated indices")
if matching_commodities is not None:
df = df.merge(matching_commodities)
if debug:
print(f"{len(df)} rows after commodities")
if any(df["index"].duplicated()):
raise ValueError("~FI_T table has duplicated indices")
attribute = row.attribute
if attribute is not None:
df = df.query("attribute == @attribute")
if debug:
print(f"{len(df)} rows after Attribute")
if any(df["index"].duplicated()):
raise ValueError("~FI_T table has duplicated indices")
region = row.region
if region is not None:
df = df.query("region == @region")
if debug:
print(f"{len(df)} rows after Region")
if any(df["index"].duplicated()):
raise ValueError("~FI_T table has duplicated indices")
# so that we can update the original table, copy original index back that was lost when merging
df = df.set_index("index")
# for speed, extract just the VALUE column as that is the only one being updated
df = df[["value"]]
if debug:
if any(df.index.duplicated()):
raise ValueError("~FI_T table has duplicated indices")
if isinstance(row.value, str) and row.value[0] in {
"*",
"+",
"-",
"/",
}:
df = df.astype({"value": float}).eval("value=value" + row.value)
else:
df["value"] = [row.value] * len(df)
if len(df) == 0:
print(f"WARNING: {tag} row matched nothing")
tables[datatypes.Tag.fi_t].update(df)
elif tag == datatypes.Tag.tfm_ins_txt:
# This row matches either a commodity or a process
assert not (
matching_commodities is not None
and matching_processes is not None
)
if matching_commodities is not None:
df = model.commodities
query_str = f"commodity in [{','.join(map(repr, matching_commodities['commodity']))}] and region == '{row['region']}'"
elif matching_processes is not None:
df = model.processes
query_str = f"process in [{','.join(map(repr, matching_processes['process']))}] and region == '{row['region']}'"
else:
print(
f"WARNING: {tag} row matched neither commodity nor process"
)
continue
# Query for rows with matching process/commodity and region
rows_to_update = df.query(query_str).index
# Overwrite (inplace) the column given by the attribute (translated by attr_prop)
# with the value from row
# E.g. if row['attribute'] == 'PRC_TSL' then we overwrite 'tslvl'
df.loc[rows_to_update, attr_prop[row["attribute"]]] = row["value"]
else:
# Construct 1-row data frame for data
# Cross merge with processes and commodities (if they exist)
row = row.filter(df.columns)
row = pd.DataFrame([row])
if matching_processes is not None:
row = matching_processes.merge(row, how="cross")
if matching_commodities is not None:
row = matching_commodities.merge(row, how="cross")
new_rows.append(row)
if tag == datatypes.Tag.tfm_ins:
new_rows.append(df) # pyright: ignore
tables[datatypes.Tag.fi_t] = pd.concat(new_rows, ignore_index=True)

print(
f" process_wildcards: {tag} took {time.time() - start_time:.2f} seconds for {len(upd)} rows"
topology = generate_topology_dictionary(tables, model)

# TODO add type annots to below fns

def match_wildcards(
row: pd.Series,
) -> tuple[DataFrame | None, DataFrame | None] | None:
matching_processes = get_matching_processes(row, topology)
matching_commodities = get_matching_commodities(row, topology)
if (matching_processes is None or len(matching_processes) == 0) and (
matching_commodities is None or len(matching_commodities) == 0
): # TODO is this necessary? Try without?
# TODO debug these
print(f"WARNING: a row matched no processes or commodities:\n{row}")
return None
return matching_processes, matching_commodities

def query(table, processes, commodities, attribute, region):
qs = []
if processes is not None and not processes.empty:
qs.append(f"process in [{','.join(map(repr, processes['process']))}]")
if commodities is not None and not commodities.empty:
qs.append(f"commodity in [{','.join(map(repr, commodities['commodity']))}]")
if attribute is not None:
qs.append(f"attribute == '{attribute}'")
if region is not None:
qs.append(f"region == '{region}'")
return table.query(" and ".join(qs)).index

def eval_and_update(table, rows_to_update, new_value):
if isinstance(new_value, str) and new_value[0] in {"*", "+", "-", "/"}:
old_values = table.loc[rows_to_update, "value"]
updated = old_values.astype(float).map(lambda x: eval("x" + new_value))
table.loc[rows_to_update, "value"] = updated
else:
table.loc[rows_to_update, "value"] = new_value

def do_an_ins_row(row):
table = tables[datatypes.Tag.fi_t]
match = match_wildcards(row)
# TODO perf: add matched procs/comms into column and use explode?
new_rows = pd.DataFrame([row.filter(table.columns)])
if match is not None:
processes, commodities = match
if processes is not None:
new_rows = processes.merge(new_rows, how="cross")
if commodities is not None:
new_rows = commodities.merge(new_rows, how="cross")
return new_rows

def do_an_ins_txt_row(row):
match = match_wildcards(row)
if match is None:
print(f"WARNING: TFM_INS-TXT row matched neither commodity nor process")
return
processes, commodities = match
if commodities is not None:
table = model.commodities
elif processes is not None:
table = model.processes
else:
assert False # All rows match either a commodity or a process

# Query for rows with matching process/commodity and region
rows_to_update = query(table, processes, commodities, None, row["region"])
# Overwrite (inplace) the column given by the attribute (translated by attr_prop)
# with the value from row
# E.g. if row['attribute'] == 'PRC_TSL' then we overwrite 'tslvl'
table.loc[rows_to_update, attr_prop[row["attribute"]]] = row["value"]
# return rows_to_update

if datatypes.Tag.tfm_upd in tables:
updates = tables[datatypes.Tag.tfm_upd]
table = tables[datatypes.Tag.fi_t]
# Reset FI_T index so that queries can determine unique rows to update
tables[datatypes.Tag.fi_t].reset_index(inplace=True)

# TODO perf: collect all updates and go through FI_T only once?
for _, row in updates.iterrows():
if row["value"] is None: # TODO is this really needed?
continue
match = match_wildcards(row)
if match is None:
continue
processes, commodities = match
rows_to_update = query(
table, processes, commodities, row["attribute"], row["region"]
)
eval_and_update(table, rows_to_update, row["value"])

if datatypes.Tag.tfm_ins in tables:
updates = tables[datatypes.Tag.tfm_ins]
new_rows = []
for _, row in updates.iterrows():
new_rows.append(do_an_ins_row(row))
new_rows.append(tables[datatypes.Tag.fi_t])
tables[datatypes.Tag.fi_t] = pd.concat(new_rows, ignore_index=True)

if datatypes.Tag.tfm_ins_txt in tables:
updates = tables[datatypes.Tag.tfm_ins_txt]
for _, row in updates.iterrows():
do_an_ins_txt_row(row)

if datatypes.Tag.tfm_mig in tables:
updates = tables[datatypes.Tag.tfm_mig]
table = tables[datatypes.Tag.fi_t]
new_tables = []

for _, row in updates.iterrows():
match = match_wildcards(row)
processes, commodities = match if match is not None else (None, None)
# TODO should we also query on limtype?
rows_to_update = query(
table, processes, commodities, row["attribute"], row["region"]
)
new_rows = table.loc[rows_to_update].copy()
# Modify values in all '*2' columns
for c, v in row.items():
if c.endswith("2") and v is not None:
new_rows.loc[:, c[:-1]] = v
# Evaluate 'value' column based on existing values
eval_and_update(new_rows, rows_to_update, row["value"])
new_tables.append(new_rows)

# Add new rows to table
new_tables.append(tables[datatypes.Tag.fi_t])
tables[datatypes.Tag.fi_t] = pd.concat(new_tables, ignore_index=True)

return tables

Expand Down