Skip to content

Commit

Permalink
Update ingest stage to use noctua unload = TRUE option (#242)
Browse files Browse the repository at this point in the history
* Add ingest script

* Update renv

* Bump renv, R versions

* Add renv reqs

* Switch to helper function

* Pass lintr

* Fix lintr

* lintr

* styler

* Add selector prefix

* Bump noctua version in dev renv profile

* Revert "Add renv reqs"

This reverts commit e50924e.

* Revert "Bump renv, R versions"

This reverts commit 7b299b1.

* Fix spacing

---------

Co-authored-by: Sweaty Handshake <william.ridgeway@cookcountyil.gov>
Co-authored-by: William Ridgeway <10358980+wrridgeway@users.noreply.github.com>
Co-authored-by: Dan Snow <daniel.snow@cookcountyil.gov>
  • Loading branch information
4 people authored Jun 12, 2024
1 parent 2e92c12 commit cc41a1a
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 27 deletions.
3 changes: 2 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ Depends:
Remotes:
ccao-data/assessr,
ccao-data/ccao,
ccao-data/lightsnip
ccao-data/lightsnip,
DyfanJones/noctua
Config/renv/profiles/dev/dependencies:
commonmark,
DBI,
Expand Down
54 changes: 40 additions & 14 deletions pipeline/00-ingest.R
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ suppressPackageStartupMessages({
library(noctua)
})

# Adds arrow support to speed up ingest process.
noctua_options(unload = TRUE)

# Establish Athena connection
AWS_ATHENA_CONN_NOCTUA <- dbConnect(noctua::athena())

Expand Down Expand Up @@ -145,7 +148,6 @@ recode_column_type <- function(col, col_name, dict = col_type_dict) {
col_type <- dict %>%
filter(var_name == col_name) %>%
pull(var_type)

switch(col_type,
numeric = as.numeric(col),
character = as.character(col),
Expand All @@ -156,6 +158,30 @@ recode_column_type <- function(col, col_name, dict = col_type_dict) {
}


# Mini function to deal with arrays
# Some Athena columns are stored as arrays but are converted to string on
# ingest. In such cases, we either keep the contents of the cell (if 1 unit),
# collapse the array into a comma-separated string (if more than 1 unit),
# or replace with NA if the array is empty
process_array_columns <- function(data, selector) {
data %>%
mutate(
across(
selector,
~ sapply(.x, function(cell) {
if (length(cell) > 1) {
paste(cell, collapse = ", ")
} else if (length(cell) == 1) {
as.character(cell) # Convert the single element to character
} else {
NA # Handle cases where the array is empty
}
})
)
)
}




#- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
Expand Down Expand Up @@ -290,6 +316,12 @@ training_data_clean <- training_data_w_hie %>%
),
char_ncu = ifelse(char_class == "212" & !is.na(char_ncu), char_ncu, 0)
) %>%
# Apply the helper function to process array columns
process_array_columns(starts_with("loc_tax_")) %>%
mutate(
loc_tax_municipality_name =
replace_na(loc_tax_municipality_name, "UNINCORPORATED")
) %>%
# Coerce columns to the data types recorded in the dictionary. Necessary
# because the SQL drivers will often coerce types on pull (boolean becomes
# character)
Expand All @@ -303,14 +335,7 @@ training_data_clean <- training_data_w_hie %>%
sv_is_outlier = replace_na(sv_is_outlier, FALSE),
sv_outlier_type = replace_na(sv_outlier_type, "Not outlier")
) %>%
# Some Athena columns are stored as arrays but are converted to string on
# ingest. In such cases, take the first element and clean the string
mutate(
across(starts_with("loc_tax_"), \(x) str_replace_all(x, "\\[|\\]", "")),
across(starts_with("loc_tax_"), \(x) str_trim(str_split_i(x, ",", 1))),
across(starts_with("loc_tax_"), \(x) na_if(x, "")),
loc_tax_municipality_name =
replace_na(loc_tax_municipality_name, "UNINCORPORATED"),
# Miscellaneous column-level cleanup
ccao_is_corner_lot = replace_na(ccao_is_corner_lot, FALSE),
ccao_is_active_exe_homeowner = replace_na(ccao_is_active_exe_homeowner, 0L),
Expand Down Expand Up @@ -395,6 +420,12 @@ assessment_data_clean <- assessment_data_w_hie %>%
type = "short",
as_factor = FALSE
) %>%
# Apply the helper function to process array columns
process_array_columns(starts_with("loc_tax_")) %>%
mutate(
loc_tax_municipality_name =
replace_na(loc_tax_municipality_name, "UNINCORPORATED")
) %>%
mutate(
char_apts = case_when(
char_class %in% c("211", "212") & !is.na(char_apts) ~ char_apts,
Expand All @@ -411,13 +442,8 @@ assessment_data_clean <- assessment_data_w_hie %>%
any_of(col_type_dict$var_name),
~ recode_column_type(.x, cur_column())
)) %>%
# Same Athena string cleaning and feature cleanup as the training data
# Same feature cleanup as the training data
mutate(
across(starts_with("loc_tax_"), \(x) str_replace_all(x, "\\[|\\]", "")),
across(starts_with("loc_tax_"), \(x) str_trim(str_split_i(x, ",", 1))),
across(starts_with("loc_tax_"), \(x) na_if(x, "")),
loc_tax_municipality_name =
replace_na(loc_tax_municipality_name, "UNINCORPORATED"),
ccao_is_corner_lot = replace_na(ccao_is_corner_lot, FALSE),
ccao_is_active_exe_homeowner = replace_na(ccao_is_active_exe_homeowner, 0L),
ccao_n_years_exe_homeowner = replace_na(ccao_n_years_exe_homeowner, 0L),
Expand Down
12 changes: 6 additions & 6 deletions renv.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1126,23 +1126,23 @@
},
"paws.analytics": {
"Package": "paws.analytics",
"Version": "0.5.0",
"Version": "0.6.0",
"Source": "Repository",
"Repository": "RSPM",
"Repository": "CRAN",
"Requirements": [
"paws.common"
],
"Hash": "ce1b08551d30927f07cf980fbcd624b9"
"Hash": "d8ea9a4dfefe8a49eceff027e053aa2f"
},
"paws.application.integration": {
"Package": "paws.application.integration",
"Version": "0.5.0",
"Version": "0.6.0",
"Source": "Repository",
"Repository": "RSPM",
"Repository": "CRAN",
"Requirements": [
"paws.common"
],
"Hash": "f50fab812dd899553df4313532636028"
"Hash": "4a768547ec143a4a0742b33929010da4"
},
"paws.common": {
"Package": "paws.common",
Expand Down
17 changes: 11 additions & 6 deletions renv/profiles/dev/renv.lock
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"R": {
"Version": "4.3.2",
"Version": "4.4.0",
"Repositories": [
{
"Name": "CRAN",
Expand Down Expand Up @@ -435,8 +435,13 @@
"noctua": {
"Package": "noctua",
"Version": "2.6.2",
"Source": "Repository",
"Repository": "CRAN",
"Source": "GitHub",
"RemoteType": "github",
"RemoteHost": "api.github.com",
"RemoteUsername": "DyfanJones",
"RemoteRepo": "noctua",
"RemoteRef": "master",
"RemoteSha": "23a4cfbf537407c7a1547fc13ba771ba2eb098e0",
"Requirements": [
"DBI",
"R",
Expand All @@ -447,7 +452,7 @@
"utils",
"uuid"
],
"Hash": "c03d73125d695e80b35b4bb3eacf0358"
"Hash": "a48e1decdd027c44ea6b97b0fe0950cb"
},
"openxlsx": {
"Package": "openxlsx",
Expand Down Expand Up @@ -552,13 +557,13 @@
},
"renv": {
"Package": "renv",
"Version": "1.0.3",
"Version": "1.0.7",
"Source": "Repository",
"Repository": "CRAN",
"Requirements": [
"utils"
],
"Hash": "41b847654f567341725473431dd0d5ab"
"Hash": "397b7b2a265bc5a7a06852524dabae20"
},
"rlang": {
"Package": "rlang",
Expand Down

0 comments on commit cc41a1a

Please sign in to comment.