Skip to content

Commit

Permalink
9 add support for min number of sales when performing statistical fla…
Browse files Browse the repository at this point in the history
…gging (ccao-data#14)

Add support for min number of sales when performing statistical flagging

---------

Co-authored-by: Michael Wagner <miwagne@ccao-datals.ccao.local>
  • Loading branch information
wagnerlmichael and Michael Wagner authored Sep 21, 2023
1 parent fd79d2e commit 6ec2968
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 9 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pre-commit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: python3.10.12
python-version: 3.10.12

- name: Install pre-commit
run: sudo apt-get install pre-commit
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ repos:
rev: 23.7.0
hooks:
- id: black
language_version: python3.10.12
language_version: 3.10.12
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,13 @@ erDiagram
arraystring iso_forest_cols
arraystring stat_groups
arraybigint dev_bounds
bigint rolling_window
string date_floor
bigint min_group_thresh
}
group_mean {
bigint group_size
double mean_price
double mean_price_sqft
string run_id PK
Expand Down
93 changes: 88 additions & 5 deletions glue/sales_val_flagging.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,63 @@ def add_rolling_window(df, num_months):
return df


def group_size_adjustment(df, stat_groups: list, min_threshold, condos: bool):
"""
Within the groups of sales we are looking at to flag outliers, some
are very small, with a large portion of groups with even 1 total sale.
This function manually sets all sales to 'Not Outlier' if they belong
to a group that is under our 'min_threshold' argument.
Inputs:
df: The data right after we perform the flagging script (go()), when the exploded
rolling window hasn't been reduced.
stat_groups: stat groups we are using for the groups within which we flag outliers
min_threshold: at which group size we want to manually set values to 'Not Outlier'
condos: boolean that tells the function to work with condos or res
Outputs:
df: dataframe with newly manually adjusted outlier values.
"""
group_counts = df.groupby(stat_groups).size().reset_index(name="count")
filtered_groups = group_counts[group_counts["count"] <= min_threshold]

# Merge df_flagged with filtered_groups on the columns to get the matching rows
merged_df = pd.merge(
df, filtered_groups[stat_groups], on=stat_groups, how="left", indicator=True
)

# List of sv_outlier_type values to check
if condos == False:
outlier_types_to_check = [
"Low price (raw & sqft)",
"Low price (raw)",
"Low price (sqft)",
"High price (raw & sqft)",
"High price (raw)",
"High price (sqft)",
]
else:
outlier_types_to_check = [
"Low price (raw)",
"High price (raw)",
]

# Modify the .loc condition to include checking for sv_outlier_type values
condition = (merged_df["_merge"] == "both") & (
merged_df["sv_outlier_type"].isin(outlier_types_to_check)
)

# Using .loc[] to set the desired values for rows meeting the condition
merged_df.loc[condition, "sv_outlier_type"] = "Not outlier"
merged_df.loc[condition, "sv_is_outlier"] = 0

# Drop the _merge column
df_flagged_updated = merged_df.drop(columns=["_merge"])

return df_flagged_updated


def finish_flags(df, start_date, manual_update):
"""
This functions
Expand Down Expand Up @@ -199,6 +256,11 @@ def get_group_mean_df(df, stat_groups, run_id, condos):
Outputs:
df: dataframe that is ready to be written to athena as a parquet
"""

# Calculate group sizes
group_sizes = df.groupby(stat_groups).size().reset_index(name="group_size")
df = df.merge(group_sizes, on=stat_groups, how="left")

unique_groups = (
df.drop_duplicates(subset=stat_groups, keep="first")
.reset_index(drop=True)
Expand All @@ -216,9 +278,11 @@ def get_group_mean_df(df, stat_groups, run_id, condos):
else:
suffixes = ["mean_price"]

cols_to_write_means = stat_groups + [
f"sv_{suffix}_{groups_string_col}" for suffix in suffixes
]
cols_to_write_means = (
stat_groups
+ ["group_size"]
+ [f"sv_{suffix}_{groups_string_col}" for suffix in suffixes]
)
rename_dict = {
f"sv_{suffix}_{groups_string_col}": f"{suffix}" for suffix in suffixes
}
Expand Down Expand Up @@ -272,6 +336,7 @@ def get_parameter_df(
rolling_window,
date_floor,
short_term_thresh,
min_group_thresh,
run_id,
):
"""
Expand Down Expand Up @@ -299,6 +364,7 @@ def get_parameter_df(
dev_bounds = dev_bounds
date_floor = date_floor
rolling_window = rolling_window
min_group_thresh = min_group_thresh

parameter_dict_to_df = {
"run_id": [run_id],
Expand All @@ -311,6 +377,7 @@ def get_parameter_df(
"dev_bounds": [dev_bounds],
"rolling_window": [rolling_window],
"date_floor": [date_floor],
"min_group_thresh": [min_group_thresh],
}

df_parameters = pd.DataFrame(parameter_dict_to_df)
Expand Down Expand Up @@ -380,6 +447,7 @@ def write_to_table(df, table_name, s3_warehouse_bucket_path, run_id):
"rolling_window_num",
"time_frame_start",
"iso_forest",
"min_groups_threshold",
"dev_bounds",
],
)
Expand Down Expand Up @@ -566,6 +634,13 @@ def write_to_table(df, table_name, s3_warehouse_bucket_path, run_id):
condos=False,
)

df_res_flagged_updated = group_size_adjustment(
df=df_res_flagged,
stat_groups=tuple(stat_groups_list),
min_threshold=int(args["min_groups_threshold"]),
condos=False,
)

# Flag condo outliers
condo_iso_forest = iso_forest_list.copy()
condo_iso_forest.remove("sv_price_per_sqft")
Expand All @@ -578,10 +653,17 @@ def write_to_table(df, table_name, s3_warehouse_bucket_path, run_id):
condos=True,
)

df_flagged_merged = pd.concat([df_res_flagged, df_condo_flagged]).reset_index(
drop=True
df_condo_flagged_updated = group_size_adjustment(
df=df_condo_flagged,
stat_groups=tuple(stat_groups_list),
min_threshold=int(args["min_groups_threshold"]),
condos=True,
)

df_flagged_merged = pd.concat(
[df_res_flagged_updated, df_condo_flagged_updated]
).reset_index(drop=True)

# Finish flagging
df_flagged_final, run_id, timestamp = finish_flags(
df=df_flagged_merged,
Expand Down Expand Up @@ -614,6 +696,7 @@ def write_to_table(df, table_name, s3_warehouse_bucket_path, run_id):
rolling_window=int(args["rolling_window_num"]),
date_floor=args["time_frame_start"],
short_term_thresh=SHORT_TERM_OWNER_THRESHOLD,
min_group_thresh=int(args["min_groups_threshold"]),
run_id=run_id,
)

Expand Down
19 changes: 18 additions & 1 deletion manual_flagging/initial_flagging.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,13 @@
condos=False,
)

df_res_flagged_updated = flg.group_size_adjustment(
df=df_res_flagged,
stat_groups=inputs["stat_groups"],
min_threshold=inputs["min_groups_threshold"],
condos=False,
)

# Flag condo outliers
condo_iso_forest = inputs["iso_forest"].copy()
condo_iso_forest.remove("sv_price_per_sqft")
Expand All @@ -149,7 +156,16 @@
condos=True,
)

df_flagged_merged = pd.concat([df_res_flagged, df_condo_flagged]).reset_index(drop=True)
df_condo_flagged_updated = flg.group_size_adjustment(
df=df_condo_flagged,
stat_groups=inputs["stat_groups"],
min_threshold=inputs["min_groups_threshold"],
condos=True,
)

df_flagged_merged = pd.concat(
[df_res_flagged_updated, df_condo_flagged_updated]
).reset_index(drop=True)

# Finish flagging and subset to write to flag table
df_to_write, run_id, timestamp = flg.finish_flags(
Expand All @@ -176,6 +192,7 @@
rolling_window=inputs["rolling_window_months"],
date_floor=inputs["time_frame"]["start"],
short_term_thresh=SHORT_TERM_OWNER_THRESHOLD,
min_group_thresh=inputs["min_groups_threshold"],
run_id=run_id,
)

Expand Down
19 changes: 18 additions & 1 deletion manual_flagging/manual_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,13 @@
condos=False,
)

df_res_flagged_updated = flg.group_size_adjustment(
df=df_res_flagged,
stat_groups=inputs["stat_groups"],
min_threshold=inputs["min_groups_threshold"],
condos=False,
)

# Flag condo outliers
condo_iso_forest = inputs["iso_forest"].copy()
condo_iso_forest.remove("sv_price_per_sqft")
Expand All @@ -159,7 +166,16 @@
condos=True,
)

df_flagged_merged = pd.concat([df_res_flagged, df_condo_flagged]).reset_index(drop=True)
df_condo_flagged_updated = flg.group_size_adjustment(
df=df_condo_flagged,
stat_groups=inputs["stat_groups"],
min_threshold=inputs["min_groups_threshold"],
condos=True,
)

df_flagged_merged = pd.concat(
[df_res_flagged_updated, df_condo_flagged_updated]
).reset_index(drop=True)

# Finish flagging and subset to write to flag table
df_flagged_final, run_id, timestamp = flg.finish_flags(
Expand Down Expand Up @@ -211,6 +227,7 @@
rolling_window=inputs["rolling_window_months"],
date_floor=inputs["time_frame"]["start"],
short_term_thresh=SHORT_TERM_OWNER_THRESHOLD,
min_group_thresh=inputs["min_groups_threshold"],
run_id=run_id,
)

Expand Down
2 changes: 2 additions & 0 deletions manual_flagging/yaml/inputs_initial.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ time_frame:
end: "2020-06-30"

rolling_window_months: 12

min_groups_threshold: 30
2 changes: 2 additions & 0 deletions manual_flagging/yaml/inputs_update.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,5 @@ time_frame:
end: "2021-12-31"

rolling_window_months: 12

min_groups_threshold: 30

0 comments on commit 6ec2968

Please sign in to comment.