Skip to content

Commit

Permalink
Add NHSN data pull to forecast_state (#261)
Browse files Browse the repository at this point in the history
  • Loading branch information
damonbayer authored Dec 21, 2024
1 parent f9ac57d commit 1d71aca
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 78 deletions.
1 change: 0 additions & 1 deletion pipelines/forecast_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,6 @@ def main(
facility_level_nssp_data=facility_level_nssp_data,
state_level_nssp_data=state_level_nssp_data,
report_date=report_date,
state_level_report_date=state_report_date,
first_training_date=first_training_date,
last_training_date=last_training_date,
param_estimates=param_estimates,
Expand Down
128 changes: 119 additions & 9 deletions pipelines/prep_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
import json
import logging
import os
import subprocess
from logging import Logger
from pathlib import Path

import forecasttools
import polars as pl
import polars.selectors as cs

_disease_map = {
"COVID-19": "COVID-19/Omicron",
Expand All @@ -15,6 +17,55 @@
_inverse_disease_map = {v: k for k, v in _disease_map.items()}


def get_nhsn(
start_date: datetime.date,
end_date: datetime.date,
disease: str,
state_abb: str,
) -> None:
def py_scalar_to_r_scalar(py_scalar):
if py_scalar is None:
return "NULL"
return f"'{str(py_scalar)}'"

disease_nhsn_key = {
"COVID-19": "totalconfc19newadm",
"Influenza": "totalconfflunewadm",
}

columns = disease_nhsn_key[disease]

state_abb = state_abb if state_abb != "US" else "USA"

r_command = [
"Rscript",
"-e",
f"""
forecasttools::pull_nhsn(
start_date = {py_scalar_to_r_scalar(start_date)},
end_date = {py_scalar_to_r_scalar(end_date)},
columns = {py_scalar_to_r_scalar(columns)},
jurisdictions = {py_scalar_to_r_scalar(state_abb)},
) |>
dplyr::mutate(weekendingdate = lubridate::as_date(weekendingdate)) |>
dplyr::rename(hospital_admissions = {py_scalar_to_r_scalar(columns)}) |>
dplyr::mutate(hospital_admissions = as.numeric(hospital_admissions)) |>
readr::write_tsv(stdout())
""",
]
result = subprocess.run(
r_command,
capture_output=True,
)
if result.returncode != 0:
raise RuntimeError(f"pull_and_save_nhsn: {result.stderr}")
raw_dat = pl.read_csv(result.stdout, separator="\t")
dat = raw_dat.with_columns(
weekendingdate=pl.col("weekendingdate").cast(pl.Date)
)
return dat


def aggregate_to_national(
data: pl.LazyFrame,
geo_values_to_include,
Expand Down Expand Up @@ -215,7 +266,6 @@ def process_and_save_state(
state_abb: str,
disease: str,
report_date: datetime.date,
state_level_report_date: datetime.date,
first_training_date: datetime.date,
last_training_date: datetime.date,
param_estimates: pl.LazyFrame,
Expand Down Expand Up @@ -275,44 +325,104 @@ def process_and_save_state(
pl.col("date") < first_facility_level_data_date
)

training_data = (
nssp_training_data = (
pl.concat([state_level_data, aggregated_facility_data])
.filter(pl.col("date") <= last_training_date)
.with_columns(pl.lit("train").alias("data_type"))
.sort(["date", "disease"])
)

verify_no_date_gaps(training_data)
verify_no_date_gaps(nssp_training_data)

nhsn_training_data = get_nhsn(
start_date=first_training_date,
end_date=last_training_date,
disease=disease,
state_abb=state_abb,
)

nssp_training_dates = (
nssp_training_data.get_column("date").unique().to_list()
)
nhsn_training_dates = (
nhsn_training_data.get_column("weekendingdate").unique().to_list()
)

nhsn_first_date_index = next(
i
for i, x in enumerate(nssp_training_dates)
if x == min(nhsn_training_dates)
)
nhsn_step_size = 7

train_disease_ed_visits = (
training_data.filter(pl.col("disease") == disease)
nssp_training_data.filter(pl.col("disease") == disease)
.get_column("ed_visits")
.to_list()
)

train_total_ed_visits = (
training_data.filter(pl.col("disease") == "Total")
nssp_training_data.filter(pl.col("disease") == "Total")
.get_column("ed_visits")
.to_list()
)

train_disease_hospital_admissions = nhsn_training_data.get_column(
"hospital_admissions"
).to_list()

data_for_model_fit = {
"inf_to_ed_pmf": delay_pmf,
"generation_interval_pmf": generation_interval_pmf,
"right_truncation_pmf": right_truncation_pmf,
"data_observed_disease_ed_visits": train_disease_ed_visits,
"data_observed_total_hospital_admissions": train_total_ed_visits,
"data_observed_disease_hospital_admissions": train_disease_hospital_admissions,
"nssp_training_dates": nssp_training_dates,
"nhsn_training_dates": nhsn_training_dates,
"nhsn_first_date_index": nhsn_first_date_index,
"nhsn_step_size": nhsn_step_size,
"state_pop": state_pop,
"right_truncation_offset": right_truncation_offset,
}
data_dir = Path(model_run_dir, "data")
os.makedirs(data_dir, exist_ok=True)

with open(Path(data_dir, "data_for_model_fit.json"), "w") as json_file:
json.dump(data_for_model_fit, json_file, default=str)

nssp_training_data_long = nssp_training_data.unpivot(
on="ed_visits",
index=cs.exclude("ed_visits"),
variable_name="value_type",
)

nhsn_training_data_long = (
nhsn_training_data.rename(
{"weekendingdate": "date", "jurisdiction": "geo_value"}
)
.unpivot(
on="hospital_admissions",
index=cs.exclude("hospital_admissions"),
variable_name="value_type",
)
.with_columns(
pl.lit(disease).alias("disease"),
pl.lit("train").alias("data_type"),
)
)

combined_training_dat = pl.concat(
[nssp_training_data_long, nhsn_training_data_long],
how="diagonal_relaxed",
).sort(["date", "geo_value", "value_type"])

if logger is not None:
logger.info(f"Saving {state_abb} to {data_dir}")
training_data.write_csv(Path(data_dir, "data.tsv"), separator="\t")

with open(Path(data_dir, "data_for_model_fit.json"), "w") as json_file:
json.dump(data_for_model_fit, json_file)

# post processing not yet updated for combined nhsn and nssp data
nssp_training_data.write_csv(Path(data_dir, "data.tsv"), separator="\t")
combined_training_dat.write_csv(
Path(data_dir, "combined_training_data.tsv"), separator="\t"
)
return None
68 changes: 0 additions & 68 deletions pipelines/pull_nhsn.R

This file was deleted.

0 comments on commit 1d71aca

Please sign in to comment.