| @@ -0,0 +1,27 @@ | |||
| out_addresses <- function(path_addresses_db, path_out_addresses) { | |||
| if (is.null(.globals$con_address)) { | |||
| withr::defer({ | |||
| DBI::dbDisconnect(.globals$con_address) | |||
| .globals$con_address <- NULL | |||
| }) | |||
| } | |||
| addresses <- | |||
| prep_open_address_db(path_addresses_db) |> | |||
| # keep only the addresses that resolved, others aren't useful in output | |||
| dplyr::filter(match_indicator == "Match") |> | |||
| # id column is an artifact of the lookup, the key is "address" | |||
| dplyr::select(-id) |> | |||
| dplyr::rename( | |||
| address_lookup = address, | |||
| address_resolved = matched_address | |||
| ) |> | |||
| dplyr::relocate(address_resolved, .after = address_lookup) |> | |||
| dplyr::collect() |> | |||
| dplyr::mutate(dplyr::across(c(dplyr::starts_with("match_"), tiger_side), factor)) | |||
| fs::dir_create(fs::path_dir(path_out_addresses)) | |||
| arrow::write_parquet(addresses, path_out_addresses, version = "2.6") | |||
| path_out_addresses | |||
| } | |||
| @@ -1,18 +1,5 @@ | |||
| prep_open_dataset <- function(path_prep, partitioning = "sboe_id", ...) { | |||
| if (!fs::file_exists(path_prep)) { | |||
| path_here <- here::here("data-prep/", path_prep) | |||
| path_up <- fs::path("..", "data-prep", path_prep) | |||
| path_up2 <- fs::path("..", "..", "data-prep", path_prep) | |||
| if (fs::file_exists(path_here)) { | |||
| path_prep <- path_here | |||
| } else if (fs::file_exists(path_up)) { | |||
| path_prep <- path_up | |||
| } else if (fs::file_exists(path_up2)) { | |||
| path_prep <- path_up2 | |||
| } else { | |||
| stop("File not found: ", path_prep) | |||
| } | |||
| } | |||
| path_prep <- resolve_path_data_prep(path_prep) | |||
| if (length(fs::dir_ls(path_prep, type = "dir")) == 0) { | |||
| partitioning <- NULL | |||
| @@ -28,3 +15,42 @@ prep_open_dataset_db <- function(table, ..., path_prep = table) { | |||
| duckdb::duckdb_register_arrow(con, table, pq) | |||
| dplyr::tbl(con, table) | |||
| } | |||
| prep_open_address_db <- function( | |||
| path_db = "address_lookup.sqlite" | |||
| ) { | |||
| path_db <- resolve_path_data_prep(path_db) | |||
| con <- if (!is.null(.globals$con_address)) { | |||
| .globals$con_address | |||
| } else { | |||
| .globals$con_address <- DBI::dbConnect(RSQLite::SQLite(), path_db) | |||
| } | |||
| tbl(con, "resolved") | |||
| } | |||
| # Utils ---- | |||
| resolve_path_data_prep <- function(path_prep) { | |||
| if (fs::file_exists(path_prep)) { | |||
| return(path_prep) | |||
| } | |||
| path_here <- here::here("data-prep/", path_prep) | |||
| path_up <- fs::path("..", "data-prep", path_prep) | |||
| path_up2 <- fs::path("..", "..", "data-prep", path_prep) | |||
| if (fs::file_exists(path_here)) { | |||
| return(path_here) | |||
| } | |||
| if (fs::file_exists(path_up)) { | |||
| return(path_up) | |||
| } | |||
| if (fs::file_exists(path_up2)) { | |||
| return(path_up2) | |||
| } | |||
| stop("File not found: ", path_prep) | |||
| } | |||
| @@ -12,8 +12,8 @@ tar_option_set( | |||
| # For distributed computing in tar_make(), supply a {crew} controller | |||
| # as discussed at https://books.ropensci.org/targets/crew.html. | |||
| controller = crew::crew_controller_local(workers = 24), | |||
| # debug = "path_receipts_parquet_8d195f7e", | |||
| # cue = tar_cue(mode = "never") | |||
| # debug = "path_out_addresses", | |||
| # cue = tar_cue(mode = "never"), | |||
| error = "null" | |||
| ) | |||
| @@ -114,7 +114,8 @@ list( | |||
| tar_target( | |||
| path_addresses_db, | |||
| prepare_addresses_lookup_db(addresses_raw$address) | |||
| prepare_addresses_lookup_db(addresses_raw$address), | |||
| format = "file" | |||
| ), | |||
| # This report list uses the latest amended report ----- | |||
| @@ -129,13 +130,30 @@ list( | |||
| # Outside data sources ----- | |||
| tar_target(candidate_listing, get_candidate_listing(2016:2023)), | |||
| tar_target(path_candidate_listing, write_parquet(candidate_listing, "../data-prep/candidate_listing/part-0.parquet")), | |||
| tar_target( | |||
| path_candidate_listing, | |||
| write_parquet(candidate_listing, "../data-prep/candidate_listing/part-0.parquet"), | |||
| format = "file" | |||
| ), | |||
| ## Voter registration records | |||
| tar_target(path_voters_txt, voter_statewide_download(), cue = tar_cue("never")), #<< invalidate to get latest | |||
| tar_target( | |||
| path_voters_parquet, | |||
| voter_statewide_convert_parquet(path_voters_txt), | |||
| cue = tar_cue("never") | |||
| cue = tar_cue("never"), | |||
| format = "file" | |||
| ), | |||
| # Output ------------------------------------------------------------------ | |||
| tar_target( | |||
| path_out_addresses, | |||
| # This needs to be run manually, otherwise it doesn't run in {targets} | |||
| # and throws an error: `bad value` | |||
| out_addresses(path_addresses_db, "data/addresses.parquet"), | |||
| cue = tar_cue("never"), | |||
| format = "file" | |||
| ) | |||
| ) | |||
| @@ -7,10 +7,11 @@ | |||
| 'usage: | |||
| run.R all | |||
| run.R target <targets>... [--shortcut] | |||
| run.R target <targets>... [--shortcut --reporter <reporter>] | |||
| run.R -h | --help | |||
| options: | |||
| --reporter <reporter> Reporter type for `tar_make()` [default: verbose_positives]. | |||
| -h --help Show this screen' -> doc | |||
| library(docopt) | |||
| @@ -19,11 +20,15 @@ opts <- docopt(doc) | |||
| Sys.setenv("IN_TARGETS" = "true") | |||
| Sys.setenv("ALLOW_DOWNLOADS" = "true") | |||
| if (is.null(opts$reporter)) { | |||
| opts$reporter <- "verbose_positives" | |||
| } | |||
| if (opts$all) { | |||
| cli::cli_alert_info("Running all targets.") | |||
| targets::tar_make() | |||
| targets::tar_make(reporter = opts$reporter) | |||
| } else { | |||
| cli::cli_alert_info("Running targets: {.and {.field {opts$targets}}}") | |||
| targets::tar_make(targets::any_of(!!opts$targets), shortcut = opts$shortcut) | |||
| targets::tar_make(targets::any_of(!!opts$targets), shortcut = opts$shortcut, reporter = opts$reporter) | |||
| } | |||
| # targets::tar_make_clustermq(workers = 2) # nolint | |||