소스 검색

snapshot

main
Garrick Aden-Buie 2 년 전
부모
커밋
f269b4914e
No known key found for this signature in database
19개의 변경된 파일1323개의 추가작업 그리고 98개의 파일을 삭제
  1. +2
    -0
      process/.gitignore
  2. +4
    -0
      process/DESCRIPTION
  3. +39
    -0
      process/R/aaa.R
  4. +69
    -0
      process/R/donations.R
  5. +66
    -0
      process/R/open_dataset.R
  6. +49
    -0
      process/R/out-cover-committees.R
  7. +72
    -0
      process/R/out-expenses.R
  8. +194
    -0
      process/R/out_candidate_listing.R
  9. +70
    -0
      process/R/out_receipts.R
  10. +0
    -56
      process/R/prep_open_dataset.R
  11. +174
    -9
      process/R/prepare_candidates.R
  12. +262
    -0
      process/R/prepare_donors.R
  13. +9
    -0
      process/R/prepare_report_list.R
  14. +13
    -6
      process/R/utils-address.R
  15. +9
    -0
      process/R/utils-parquet.R
  16. +4
    -0
      process/R/utils-path.R
  17. +168
    -5
      process/_targets.R
  18. +104
    -18
      process/_targets/meta/meta
  19. +15
    -4
      process/run.R

+ 2
- 0
process/.gitignore 파일 보기

@@ -0,0 +1,2 @@
data-out
*.log

+ 4
- 0
process/DESCRIPTION 파일 보기

@@ -23,11 +23,15 @@ Depends:
Imports:
arrow,
cli,
clustringr,
crew,
DBI,
dbplyr,
desc,
docopt,
duckdb,
fastLink,
here,
lgr,
tidyr,
visNetwork

+ 39
- 0
process/R/aaa.R 파일 보기

@@ -0,0 +1,39 @@
lg_new_logger <- function() {
lg <- lgr::LoggerGlue$new("nc-campaign-finance")
if (!interactive()) {
lg$add_appender(
lgr::AppenderFile$new(here::here("nc-campaign-finance.log")),
name = "file"
)
lg$set_threshold("info")
}
lg
}

lg_get_logger <- function() {
if (is.null(.globals$logger)) {
.globals$logger <- lg_new_logger()
}

.globals$logger
}

lg_inject_target_name <- function(lg = .globals$logger) {
target_name <- targets::tar_name()
if (target_name == "target") return()
lg$add_filter(lgr::FilterInject$new(target = target_name), name = "target_name")
withr::defer_parent(lg$remove_filter("target_name"))
invisible(lg)
}

lg_info_target <- function(lg = .globals$logger) {
target_name <- targets::tar_name()
if (target_name == "target") return()

lg$info("------ start: {target_name} ------")
withr::defer_parent({
lg$info("------ done: {target_name} ------")
})

invisible(lg)
}

+ 69
- 0
process/R/donations.R 파일 보기

@@ -0,0 +1,69 @@
donations_from_receipts <- function(
receipts,
addresses_out = NULL,
receipt_type_code = c("IND", "CPCM", "GEN", "PPTY", "OUTS", "NFPC"),
exclude_org_name = c("AGGREGATED INDIVIDUAL CONTRIBUTION", "VARIOUS VARIOUS")
) {
if (is.character(receipts)) {
receipts <- prep_open_dataset_db(receipts)
}

if (!is.null(addresses_out) && is.character(addresses_out)) {
addresses <- out_open_dataset_db(addresses_out)
}

ret <-
receipts |>
filter(
# Keep individual/party donors; drop record keeping things, like refunds
receipt_type_code %in% !!receipt_type_code,
!toupper(org_name) %in% !!exclude_org_name
) |>
rename(donor_name = org_name) |>
# address lookup is the key for matching with the resolved addresses db
add_address_lookup(name = "donor_address_raw") |>
select(
sboe_id,
report_id,
donor_name,
donor_address_raw,
amount,
profession,
employers_name,
form_of_payment_desc,
everything()
)

if (is.null(addresses_out)) return(ret)

ret |>
left_join(
addresses |> select(1:2),
by = join_by(donor_address_raw == address_lookup)
) |>
rename(donor_address = address_resolved) |>
relocate(donor_address, .after = donor_name) |>
mutate(donor_address = coalesce(donor_address, !!fixup_po_box_query(donor_address_raw)))
}

count_receipt_types <- function(receipts) {
# # A tibble: 13 × 3
# receipt_type_code receipt_type_desc n
# <chr> <chr> <dbl>
# 1 IND Individual Contribution 7425756
# 2 CPCM Other Political Committee Contribution 78354
# 3 GEN General Contribution 65692
# 4 PPTY Party Contribution 27623
# 5 OUTS Outside Source 16415
# 6 INT Interest Earned 10308
# 7 RFND Refund/Reimbursement to the Committee 7353
# 8 CNRE Contribution to be Reimbursed 3057
# 9 EPPS Exempt Party Price Sale 685
# 10 NFPC Not for Profit Contribution 628
# 11 DON Donation 28
# 12 DEBT Debt Payment 4
# 13 GNS Goods and Services 1
#
receipts |>
count(receipt_type_code, receipt_type_desc, sort = TRUE)
}

+ 66
- 0
process/R/open_dataset.R 파일 보기

@@ -0,0 +1,66 @@
prep_open_dataset <- function(path_prep, partitioning = "sboe_id", ...) {
cf_open_dataset(path_prep, partitioning = partitioning, ..., dir = "data-prep")
}

out_open_dataset <- function(path_out, partitioning = "sboe_id", ...) {
cf_open_dataset(path_out, partitioning = partitioning, ..., dir = "data-out")
}

cf_open_dataset <- function(path, partitioning = "sboe_id", ..., dir = "data-prep") {
path <- resolve_path_up_2(path, dir = dir)

if (length(fs::dir_ls(path, type = "dir")) == 0) {
partitioning <- NULL
}

arrow::open_dataset(path, partitioning = partitioning, ...)
}

prep_open_dataset_db <- function(table, ..., path_prep = table) {
cf_open_dataset_db(table, ..., path = path_prep, dir = "data-prep")
}

out_open_dataset_db <- function(table, ..., path_out = table) {
cf_open_dataset_db(table, ..., path = path_out, dir = "data-out")
}

cf_open_dataset_db <- function(table, ..., path = table, dir = "data-prep") {
pq <- cf_open_dataset(path, ..., dir = dir)

con <- duckdb_global_con()
duckdb::duckdb_register_arrow(con, table, pq)
dplyr::tbl(con, table)
}

prep_open_address_db <- function(
path_db = "address_lookup.sqlite"
) {
path_db <- resolve_path_up_2(path_db)

con <- if (!is.null(.globals$con_address)) {
.globals$con_address
} else {
.globals$con_address <- DBI::dbConnect(RSQLite::SQLite(), path_db)
}

tbl(con, "resolved")
}

# Utils ----
resolve_path_up_2 <- function(path, dir = "data-prep") {
if (fs::file_exists(path)) {
return(path)
}

path_here <- here::here(dir, path)
path_wd <- fs::path(dir, path)
path_up <- fs::path("..", dir, path)
path_up2 <- fs::path("..", "..", dir, path)

if (fs::file_exists(path_wd)) return(path_wd)
if (fs::file_exists(path_here)) return(path_here)
if (fs::file_exists(path_up)) return(path_up)
if (fs::file_exists(path_up2)) return(path_up2)

stop("File not found: ", path)
}

+ 49
- 0
process/R/out-cover-committees.R 파일 보기

@@ -0,0 +1,49 @@
out_cover <- function(
path_data_prep_cover,
path_out_report_list
) {
lg_info_target(lg_get_logger())

out <- path("data-out", "cover", "cover.parquet")
dir_create(path_dir(out))

report_list <- out_open_dataset_db(path_out_report_list)

cover <-
prep_open_dataset_db(path_data_prep_cover) |>
semi_join(report_list, by = "report_id") |>
collect()

arrow::write_parquet(cover, out)

dirname(out)
}

out_committees <- function(path_out_cover) {
out <- path("data-out", "committees", "committees.parquet")
dir_create(path_dir(out))

cover <- out_open_dataset_db(path_out_cover)

committee_name_address <-
cover |>
distinct(sboe_id, committee_name, street_1, street_2, city, state, zip_code) |>
add_address_lookup(postal_code = zip_code)

committee_type <-
cover |>
group_by(sboe_id) |>
count(committee_type, fund_type, fund_name) |>
slice_max(n, n = 1) |>
ungroup() |>
select(-n)

committees <-
committee_name_address |>
left_join(committee_type, by = "sboe_id") |>
collect()

arrow::write_parquet(committees, out)

dirname(out)
}

+ 72
- 0
process/R/out-expenses.R 파일 보기

@@ -0,0 +1,72 @@
out_expenses_payee <- function(
path_data_prep_expenditures,
path_out_report_list
) {
lg_info_target(lg_get_logger())

out <- path("data-out", "expenses_payee", "expenses_payee.parquet")
dir_create(path_dir(out))

report_list <- out_open_dataset_db(path_out_report_list)

expenses <-
prep_open_dataset_db(path_data_prep_expenditures) |>
semi_join(report_list, by = "report_id")

expenses_payee <-
expenses |>
distinct(
org_name,
is_org,
is_us,
profession,
employers_name,
street_1,
street_2,
city,
state,
full_zip,
country_name
) |>
add_address_lookup(postal_code = full_zip, name = "address_lookup") |>
mutate(payee_id = row_number(), .before = 1)

arrow::write_parquet(collect(expenses_payee), out)

dirname(out)
}

out_expenses <- function(
path_data_prep_expenditures,
path_out_expenses_payee,
path_out_report_list
) {
lg_info_target(lg_get_logger())

out <- path("data-out", "expenses", "expenses.parquet")
dir_create(path_dir(out))

report_list <- out_open_dataset_db(path_out_report_list)
expenses_payee <-
out_open_dataset_db(path_out_expenses_payee) |>
select(-address_lookup)

expenses <-
prep_open_dataset_db(path_data_prep_expenditures) |>
semi_join(report_list, by = "report_id") |>
select(-name_sort)

# Replace payee info with payee_id
cols_payee_common <- intersect(colnames(expenses_payee), colnames(expenses))

expenses <-
expenses |>
left_join(expenses_payee, by = cols_payee_common) |>
select(-one_of(cols_payee_common)) |>
relocate(payee_id, .after = report_id) |>
collect()

arrow::write_parquet(expenses, out)

dirname(out)
}

+ 194
- 0
process/R/out_candidate_listing.R 파일 보기

@@ -0,0 +1,194 @@
prep_candidate_listing <- function(
path_candidate_listing_raw
) {

candidate_listing_contest <-
prep_open_dataset_db(fs::path_dir(path_candidate_listing_raw)) |>
filter(name_on_ballot != "No Preference") |>
tidyr::replace_na(list(
first_name = "",
middle_name = "",
last_name = "",
name_suffix_lbl = ""
))

# candidate names ----
# Extract candidate names, these will be primary keys for the candidates table
candidate_names <-
candidate_listing_contest |>
dbplyr::window_order(last_name, first_name, middle_name) |>
distinct(first_name, middle_name, last_name, name_suffix_lbl) |>
mutate(candidate_id = row_number(), .before = 1)

# candidate_name_on_ballot ----
candidate_name_on_ballot <-
extract_candidate_info(
candidate_listing_contest,
candidate_names,
info_vars = c("name_on_ballot")
) |>
collect()

# candidate_address ----
candidate_address <-
candidate_listing_contest |>
mutate(
phone = coalesce(phone, office_phone, business_phone),
street_address = toupper(street_address),
street_address = REGEXP_REPLACE(street_address, " +", " ", "g"),
street_address = trimws(street_address),
) |>
select(-office_phone, -business_phone) |>
extract_candidate_info(
candidate_names,
info_vars = c("street_address", "city", "state", "zip_code", "phone", "email")
) |>
collect() |>
rename(street = street_address)

# candidate_party ----
candidate_party <-
candidate_listing_contest |>
extract_candidate_info(
candidate_names,
info_vars = c("party_candidate")
) |>
collect() |>
mutate(party_candidate = forcats::fct_inorder(party_candidate))

# Extract contests (remaining data in candidate_listing) ----
cols_candidate_id <- intersect(colnames(candidate_names), colnames(candidate_listing_contest))
cols_related <- setdiff(
c(colnames(candidate_name_on_ballot), colnames(candidate_address), colnames(candidate_party)),
"election_dt"
)

contests <-
candidate_listing_contest |>
select(election_dt:name_suffix_lbl) |>
select(-any_of(cols_related)) |>
left_join(candidate_names, by = cols_candidate_id) |>
relocate(candidate_id, .before = first_name) |>
collect()

# Get current complete contact information ----
candidate_contact_current <-
candidate_listing_current_contact_info(candidate_address)

# Join candidates into one big table ----
candidates <-
candidate_names |>
collect() |>
left_join(
candidate_name_on_ballot |>
slice_max(election_dt, by = candidate_id, n = 1) |>
select(-election_dt),
by = "candidate_id",
relationship = "one-to-one"
) |>
left_join(
candidate_contact_current,
by = "candidate_id",
relationship = "one-to-one"
) |>
left_join(
candidate_party |>
filter(!is.na(party_candidate)) |>
group_by(candidate_id) |>
slice_max(election_dt, n = 1) |>
arrange(party_candidate) |>
slice_head(n = 1) |>
ungroup() |>
select(-election_dt) |>
rename(party_last = party_candidate),
by = "candidate_id",
relationship = "one-to-one"
) |>
left_join(
candidate_party |>
filter(!is.na(party_candidate)) |>
group_by(candidate_id) |>
count(party_candidate) |>
slice_max(n, n = 1) |>
arrange(party_candidate) |>
slice_head(n = 1) |>
ungroup() |>
select(-n) |>
rename(party_most = party_candidate),
by = "candidate_id",
relationship = "one-to-one"
) |>
left_join(
contests |>
group_by(candidate_id) |>
distinct(candidate_id, election_dt, contest_name) |>
summarize(
contest_n = n(),
contest_first = min(election_dt),
contest_latest = max(election_dt)
),
by = "candidate_id",
relationship = "one-to-one"
) |>
relocate(name_on_ballot, .before = first_name) |>
relocate(starts_with("party"), .before = street) |>
relocate(starts_with("contest"), .before = street) |>
add_address_lookup(street = street, postal_code = zip_code)

# Return list of tables
list(
elections = contests,
candidates = candidates,
candidate_name_on_ballot = candidate_name_on_ballot,
candidate_contact = candidate_address,
candidate_party = candidate_party
)
}

extract_candidate_info <- function(
candidate_listing_contest,
candidate_names,
info_vars
) {
candidate_listing_contest |>
select(first_name:name_suffix_lbl, election_dt, all_of(info_vars)) |>
distinct() |>
left_join(
candidate_names,
by = c("first_name", "middle_name", "last_name", "name_suffix_lbl")
) |>
select(-first_name, -middle_name, -last_name, -name_suffix_lbl) |>
relocate(candidate_id, .before = 1) |>
distinct() |>
arrange(candidate_id, election_dt, !!info_vars)
}

candidate_listing_current_contact_info <- function(candidate_address) {
candidate_phone_current <-
candidate_address |>
filter(!is.na(phone)) |>
slice_max(election_dt, by = candidate_id, n = 1, with_ties = FALSE) |>
select(candidate_id, phone)

candidate_email_current <-
candidate_address |>
filter(!is.na(email)) |>
slice_max(election_dt, by = candidate_id, n = 1, with_ties = FALSE) |>
select(candidate_id, email)

candidate_address_current <-
candidate_address |>
group_by(candidate_id) |>
slice_max(election_dt, n = 1) |>
select(-election_dt) |>
filter(
n_distinct(street) == 1 | !grepl("PO BOX", street)
) |>
slice_head(n = 1) |>
select(candidate_id, street:zip_code) |>
ungroup()

candidate_address_current |>
left_join(candidate_phone_current, by = "candidate_id", relationship = "one-to-one") |>
left_join(candidate_email_current, by = "candidate_id", relationship = "one-to-one")
}

+ 70
- 0
process/R/out_receipts.R 파일 보기

@@ -0,0 +1,70 @@
out_receipts_payer <- function(
path_data_prep_receipts,
path_out_report_list
) {
lg_info_target(lg_get_logger())

out <- path("data-out", "receipts_payer", "receipts_payer.parquet")
dir_create(path_dir(out))

report_list <- out_open_dataset_db(path_out_report_list)

receipts <-
prep_open_dataset_db(path_data_prep_receipts) |>
semi_join(report_list, by = "report_id")

receipts_payer <-
receipts |>
distinct(
org_name,
is_org,
is_us,
profession,
employers_name,
street_1,
city,
state,
full_zip,
country_name
) |>
add_address_lookup(postal_code = full_zip, name = "address_lookup") |>
mutate(payer_id = row_number(), .before = 1)

arrow::write_parquet(collect(receipts_payer), out)

dirname(out)
}

out_receipts <- function(
path_data_prep_receipts,
path_out_receipts_payer,
path_out_report_list
) {
lg_info_target(lg_get_logger())

out <- path("data-out", "receipts", "receipts.parquet")
dir_create(path_dir(out))

report_list <- out_open_dataset_db(path_out_report_list)
receipts_payer <-
out_open_dataset_db(path_out_receipts_payer) |>
select(-address_lookup)

receipts <-
prep_open_dataset_db(path_data_prep_receipts) |>
semi_join(report_list, by = "report_id")

# Replace payer info with payer_id
cols_payer_common <- intersect(colnames(receipts_payer), colnames(receipts))

receipts <-
receipts |>
left_join(receipts_payer, by = cols_payer_common) |>
select(-one_of(cols_payer_common)) |>
relocate(payer_id, .after = report_id) |>
collect()

arrow::write_parquet(receipts, out)

dirname(out)
}

+ 0
- 56
process/R/prep_open_dataset.R 파일 보기

@@ -1,56 +0,0 @@
prep_open_dataset <- function(path_prep, partitioning = "sboe_id", ...) {
path_prep <- resolve_path_data_prep(path_prep)

if (length(fs::dir_ls(path_prep, type = "dir")) == 0) {
partitioning <- NULL
}

arrow::open_dataset(path_prep, partitioning = partitioning, ...)
}

prep_open_dataset_db <- function(table, ..., path_prep = table) {
pq <- prep_open_dataset(path_prep, ...)

con <- duckdb_global_con()
duckdb::duckdb_register_arrow(con, table, pq)
dplyr::tbl(con, table)
}

prep_open_address_db <- function(
path_db = "address_lookup.sqlite"
) {
path_db <- resolve_path_data_prep(path_db)

con <- if (!is.null(.globals$con_address)) {
.globals$con_address
} else {
.globals$con_address <- DBI::dbConnect(RSQLite::SQLite(), path_db)
}

tbl(con, "resolved")
}

# Utils ----
resolve_path_data_prep <- function(path_prep) {
if (fs::file_exists(path_prep)) {
return(path_prep)
}

path_here <- here::here("data-prep/", path_prep)
path_up <- fs::path("..", "data-prep", path_prep)
path_up2 <- fs::path("..", "..", "data-prep", path_prep)

if (fs::file_exists(path_here)) {
return(path_here)
}

if (fs::file_exists(path_up)) {
return(path_up)
}

if (fs::file_exists(path_up2)) {
return(path_up2)
}

stop("File not found: ", path_prep)
}

+ 174
- 9
process/R/prepare_candidates.R 파일 보기

@@ -1,15 +1,180 @@
prepare_candidates <- function(path_officers = "../data-prep/officers", report_list = tar_read(report_list)) {
officers_pq <- prep_open_dataset(path_officers)
prepare_candidates_for_matching <- function(
path_data_prep_officers,
path_out_report_list,
path_addresses = "data-out/addresses"
) {
lg_info_target(lg_get_logger())

officers <- officers_pq |> filter(type == "Candidate") |> collect()
report_list <- out_open_dataset_db(path_out_report_list)
addresses <- out_open_dataset_db(path_addresses)

officers |>
candidates <-
out_open_dataset_db(path_data_prep_officers) |>
semi_join(report_list, by = "report_id") |>
filter(type == "Candidate") |>
filter(!is.na(name)) |>
mutate(
name_display = name,
name = toupper(name)
name_clean = toupper(name),
name_clean = REGEXP_REPLACE(name_clean, " FOR .+$", "", "g"),
name_clean = REGEXP_REPLACE(name_clean, "\\s+", " ", "g")
) |>
distinct(sboe_id, name, .keep_all = TRUE) |>
semi_join(report_list, by = "report_id")
distinct(sboe_id, name_clean, address) |>
rename(address_raw = address) |>
left_join(
addresses |> select(1:2),
by = join_by(address_raw == address_lookup)
) |>
rename(address = address_resolved)


# Add new rows with aliases for people like `"ROLLANDE \"ROLIE\" SAMPSON"`
candidates_aliases <-
candidates |>
filter(grepl('"[A-Z]+"', name_clean)) |>
mutate(
name_clean = REGEXP_REPLACE(name_clean, '.+? "([A-Z]+)"(.+)$', "\\1 \\2", "g"),
name_clean = REGEXP_REPLACE(name_clean, "[^A-Z ]", "", "g"),
name_clean = REGEXP_REPLACE(name_clean, "\\s+", " ", "g"),
name_clean = REGEXP_REPLACE(name_clean, "^\\s+|\\s+$", "", "g")
) |>
collect()

candidates <- candidates |>
mutate(
name_clean = REGEXP_REPLACE(name_clean, "[^A-Z ]", "", "g"),
name_clean = REGEXP_REPLACE(name_clean, "\\s+", " ", "g"),
name_clean = REGEXP_REPLACE(name_clean, "^\\s+|\\s+$", "", "g")
) |>
collect()

candidates |>
bind_rows(candidates_aliases) |>
filter(!is.na(name_clean)) |>
tidyr::extract(
address,
c("street", "city", "state", "postal_code"),
"(.+), (.+), ([A-Z]{2}), (\\d{5})"
) |>
mutate(
street = if_else(!grepl("\\d", street), NA_character_, street),
)
}

prepare_candidate_listing_for_matching <- function(candidate_listing) {
lg_info_target(lg_get_logger())

cl <-
candidate_listing |>
mutate(
name_clean = toupper(name_on_ballot),
name_clean = gsub("[^A-Z ]", "", name_clean)
) |>
distinct(
name_on_ballot, name_clean,
street = street_address,
city, state,
postal_code = zip_code
)
}

fastlink_candidates <- function(candidates_for_matching, candidate_listing_for_matching) {
lg_info_target(lg_get_logger())

fastLink::fastLink(
candidates_for_matching,
candidate_listing_for_matching,
varnames = c("name_clean", "street", "city"),
stringdist.match = c("name_clean", "street"),
partial.match = c("name_clean", "street"),
stringdist.method = "dl"
)
}

fastlink_match_candidates <- function(
candidates_for_matching,
candidate_listing_for_matching,
candidates_linked
) {
lg_info_target(lg_get_logger())

matches <- fastLink::getMatches(
candidates_for_matching,
candidate_listing_for_matching,
candidates_linked,
threshold.match = 0.8
)

names(matches)[ncol(candidates_for_matching) + 1] <- "name_on_ballot"

candidates_for_matching |>
left_join(
matches |> select(sboe_id, name_on_ballot),
by = join_by(sboe_id == sboe_id)
)
}

candidates_match <- function(
candidates_for_matching,
candidate_listing_for_matching,
candidates_linked
) {
# First, direct matches
candidates_matched_1 <-
candidates_for_matching |>
inner_join(
candidate_listing_for_matching |>
select(name_on_ballot, name_clean, street, city),
by = join_by(
name_clean == name_clean,
street == street,
city == city
),
relationship = "many-to-many"
) |>
distinct(sboe_id, name_on_ballot)

# Then unambiguous matches on street + city
matches_street_city <-
candidates_for_matching |>
anti_join(candidates_matched_1, by = "sboe_id") |>
inner_join(
candidate_listing_for_matching |>
select(name_on_ballot, street, city),
by = join_by(
street == street,
city == city
),
relationship = "many-to-many"
) |>
distinct(sboe_id, name_on_ballot, street, city) |>
group_by(street, city) |>
mutate(n_names = n_distinct(name_on_ballot)) |>
ungroup() |>
filter(n_names == 1) |>
select(sboe_id, name_on_ballot)

candidates_matched_2 <- union(candidates_matched_1, matches_street_city)

candidates_for_matching_left <-
candidates_for_matching |>
anti_join(candidates_matched_2, by = "sboe_id")

# Now fuzzyjoin...
candidates_fuzzy_name <-
zoomerjoin::jaccard_inner_join(
candidates_for_matching_left,
candidate_listing_for_matching |>
select(name_on_ballot, name_clean, street, city),
by = "name_clean",
threshold = 0.85
)

candidates_fuzzy_name |>
distinct(sboe_id, name_on_ballot) |>
group_by(sboe_id) |>
mutate(n_names = n_distinct(name_on_ballot)) |>
ungroup() |>
filter(n_names != 1) |>
arrange(desc(n_names), sboe_id)


}

+ 262
- 0
process/R/prepare_donors.R 파일 보기

@@ -0,0 +1,262 @@
prepare_donors_latest <- function(
path_data_prep_receipts,
path_out_report_list,
path_addresses = "data-out/addresses"
) {
report_list <- out_open_dataset_db(path_out_report_list)

donations <-
donations_from_receipts(
receipts = path_data_prep_receipts,
addresses_out = path_addresses
) |>
semi_join(report_list, by = "report_id")

donations_prep <-
donations |>
mutate(
donor_name_clean = toupper(donor_name),
donor_name_clean = if_else(
grepl(" (INC|LLC|PAC)", donor_name_clean),
donor_name_clean,
# attempt to fix LAST, FIRST NAME issues
REGEXP_REPLACE(donor_name_clean, "^([A-Z]{2, }), (.+)", "\\2 \\1", "g")
),
# "O'BRIEN -> OBRIEN" plus other chars that should be non-spaced
donor_name_clean = REGEXP_REPLACE(donor_name_clean, "['&*\".()]", "", "g"),
# remove anything that isn't A-Z, space, period or parens
donor_name_clean = REGEXP_REPLACE(donor_name_clean, "[^A-Z0-9 #:;?/]", " ", "g"),
donor_name_clean = REGEXP_REPLACE(donor_name_clean, " +", " ", "g"),
# get last name, but remove suffixes
donor_name_last = REGEXP_REPLACE(donor_name_clean, " (II|III|IV|[JS]R[.]?)", "", "g"),
donor_name_last = REGEXP_REPLACE(donor_name_clean, "^ +| +$", "", "g"),
donor_name_last = RIGHT(donor_name_last, INSTR(REVERSE(donor_name_last), ' ') - 1L),
donor_name_first_init = LEFT(donor_name_clean, 1L),
) |>
relocate(donor_name_clean, donor_name_last, donor_name_first_init, .after = donor_name)

donors_latest_tbl <-
donations_prep |>
group_by(donor_name_clean, donor_address) |>
slice_max(occur_date, report_id, n = 1, with_ties = FALSE) |>
ungroup() |>
mutate(across(c(profession, employers_name), toupper)) |>
distinct(
donor_name,
donor_name_clean,
donor_name_last,
donor_name_first_init,
donor_address,
donor_address_raw,
profession,
employers_name,
report_id
)

donors_latest_tbl |>
collect() |>
mutate(
donor_id_raw = row_number(),
.before = 1
)
}

prepare_donors_for_matching <- function(donors_latest) {
donors_latest |>
tidyr::extract(
donor_address,
c("street", "city", "state", "postal_code"),
"(.+), (.+), ([A-Z]{2}), (\\d{5})"
) |>
select(
donor_id_raw,
donor_name_clean,
donor_name_first_init, donor_name_last,
street, city, postal_code,
profession, employers_name
) |>
mutate(
# A real address has to have a number in it, right?
street = if_else(!grepl("\\d", street), NA_character_, street),
)
}

prepare_donors_matching_sample <- function(donors_for_matching) {
donors_in_nc <-
donors_for_matching |>
filter(is.na(postal_code) | grepl("^2[78]", postal_code))

donors_outside_nc <- anti_join(donors_for_matching, donors_in_nc, by = "donor_id_raw")

bind_rows(
donors_in_nc |> slice_sample(n = 50000),
donors_outside_nc |> slice_sample(n = 10000)
)
}

prepare_donor_zip_blocks <- function(donors) {
lg_info_target(lg_get_logger())

pc_counts <-
donors |>
count(postal_code) |>
filter(n > 1, !is.na(postal_code))

donors |>
semi_join(pc_counts, by = "postal_code") |>
mutate(
zip_pre = case_when(
# North Carolina zip codes start with 27*** and 28***,
# so we can group into smaller block
grepl("^2[78]", postal_code) ~ substr(postal_code, 1, 3),
# Outside of NC, we'll group into region codes (first two of zip)
TRUE ~ substr(postal_code, 1, 1)
)
) |>
group_nest(zip_pre) |>
mutate(n = map_int(data, nrow)) |>
arrange(desc(n)) |>
select(-n)
}

cluster_text <- function(x, column) {
col_expr <- rlang::enexpr(column)
col_sym <- rlang::sym(rlang::expr_text(col_expr))

val <-
x |>
mutate(
..clean = stringr::str_to_lower(stringr::str_squish(!!col_sym))
)

cluster <- clustringr::cluster_strings(
unique(val$..clean),
max_dist = 1,
)
cluster <- cluster$df_clusters

val |>
left_join(cluster, by = join_by(..clean == node)) |>
select(-..clean, -size) |>
relocate(cluster, .after = !!col_sym) |>
rename("{col_sym}_cluster" := cluster)
}

prepare_donor_city_blocks <- function(donors) {
lg_info_target(lg_get_logger())

donors |>
cluster_text(city) |>
group_nest(city_cluster) |>
mutate(n = map_int(data, nrow)) |>
filter(n > 1) |>
arrange(desc(n)) |>
select(-n)
}

prepare_donor_name_blocks <- function(donors) {
lg_info_target(lg_get_logger())

donors |>
filter(!is.na(donor_name_last)) |>
mutate(
donor_name_last = case_when(
grepl("[^A-Z]", donor_name_last) ~ "other",
TRUE ~ substr(donor_name_last, 1, 1)
)
) |>
group_nest(donor_name_last) |>
mutate(n = map_int(data, nrow)) |>
filter(n > 1) |>
arrange(desc(n)) |>
select(-n)
}


fastlink_donor_blocks <- function(donor_blocks, ..., quiet = FALSE) {
lg <- lg_get_logger()
lg_info_target(lg)

# The goal here is to train once on a subset of the complete data
# and then apply the training to each block

# fastLink(verbose = TRUE) can introduce errors (their fault)
p_fastlink_donors <- partial(fastlink_donors, ..., verbose = TRUE)

if (quiet) {
p_fastlink_donors <- quietly(p_fastlink_donors)
}

# Now apply the model to each block
lg$info("Applying EM model to {nrow(donor_blocks)} blocks...")
donor_blocks$matches <- vector("list", nrow(donor_blocks))

for (i in seq_len(nrow(donor_blocks))) {
block_data <- donor_blocks$data[[i]]
lg$info(" ┌ [{i}] start ({nrow(block_data)} rows)")

res <- p_fastlink_donors(block_data)
donor_blocks$matches[[i]] <- if (quiet) res$result else res

lg$info(" └ [{i}] done")
}
lg$info("Found matches for all blocks")

donor_blocks
}

fastlink_donors_em_model <- function(donors_em_model_sample) {
lg <- lg_get_logger()
lg_info_target(lg)

lg$info("Training fastLink EM on donor data with {nrow(donors_em_model_sample)} rows...")
em_obj <- fastlink_donors(donors_em_model_sample, estimate.only = TRUE)
lg$info("Done: Training fastLink EM")

em_obj
}

fastlink_donors <- function(
donors_for_matching,
...,
verbose = TRUE,
exclude_vars = NUL
) {
donors_for_matching <- janitor::remove_empty(donors_for_matching, "cols")

is_city_constant <- n_distinct(donors_for_matching$city) == 1

match_vars <- setdiff(
names(donors_for_matching),
c(
"donor_id_raw", "postal_code",
if (is_city_constant) "city",
"donor_name_first_init", "donor_name_last"
)
)

if (length(match_vars) == 0) {
return(NULL)
}

str_vars <- c("donor_name_clean", "street", "profession", "employers_name")
str_vars <- intersect(str_vars, match_vars)

fastLink::fastLink(
dfA = donors_for_matching,
dfB = donors_for_matching,
varnames = match_vars,
stringdist.match = str_vars,
partial.match = str_vars,
verbose = verbose,
...
)
}

prepare_donors_fastlink_matches <- function(donors_for_matching, donors_fl, ...) {
fastLink::getMatches(
dfA = donors_for_matching,
dfB = donors_for_matching,
fl.out = donors_fl
)
}

+ 9
- 0
process/R/prepare_report_list.R 파일 보기

@@ -15,3 +15,12 @@ prepare_report_list <- function(path_report_list) {

out
}

out_report_list <- function(report_list) {
out <- path("data-out", "report_list", "report_list.parquet")
dir_create(path_dir(out))

arrow::write_parquet(report_list, out)

dirname(out)
}

+ 13
- 6
process/R/utils-address.R 파일 보기

@@ -1,15 +1,22 @@
fixup_po_box <- function(x) {
gsub(
"P\\s*[.]*\\s*O\\s*[.]*\\s*BOX|POST OFFICE BOX",
"PO BOX",
"(P\\s*[.]*\\s*O\\s*[.]*\\s*B(OX)?|POST OFFICE BOX) #?(\\d+)",
"PO BOX \\3",
x
)
}

REGEXP_REPLACE <- function(pattern, replacement, text, ...) {
}

fixup_po_box_query <- function(x) {
REGEXP_REPLACE(
"P\\s*[.]*\\s*O\\s*[.]*\\s*BOX|POST OFFICE BOX",
"PO BOX",
x
var <- rlang::expr_text(rlang::enexpr(x))
dbplyr::sql(
sprintf(
"REGEXP_REPLACE(%s, '%s', '%s')",
var,
"(P\\s*[.]*\\s*O\\s*[.]*\\s*B(OX)?|POST OFFICE BOX) #?(\\d+)",
"PO BOX \\3"
)
)
}

+ 9
- 0
process/R/utils-parquet.R 파일 보기

@@ -3,3 +3,12 @@ write_parquet <- function(x, path, ...) {
arrow::write_parquet(x, path, ...)
path
}

out_write_parquet <- function(x, name, ...) {
out <- fs::path("data-out", name, name, ext = "parquet")
fs::dir_create(fs::path_dir(out))

arrow::write_parquet(x, out, ...)

fs::path_dir(out)
}

+ 4
- 0
process/R/utils-path.R 파일 보기

@@ -0,0 +1,4 @@
cf_root <- function(...) {
root <- rprojroot::find_root(rprojroot::has_file("0-time-log.csv"))
fs::path(root, ...)
}

+ 168
- 5
process/_targets.R 파일 보기

@@ -126,13 +126,13 @@ list(

tar_target(committees, prepare_committees(cover_raw, report_list)),

tar_target(candidates, prepare_candidates(path_data_prep_officers, report_list)),
tar_target(donations, prepare_donations(path_data_prep_receipts, report_list)),

# Outside data sources -----
tar_target(candidate_listing, get_candidate_listing(2016:2023)),
tar_target(candidate_listing_raw, get_candidate_listing(2016:2023)),
tar_target(
path_candidate_listing,
write_parquet(candidate_listing, "../data-prep/candidate_listing/part-0.parquet"),
path_candidate_listing_raw,
write_parquet(candidate_listing_raw, "../data-prep/candidate_listing/part-0.parquet"),
format = "file"
),

@@ -146,14 +146,177 @@ list(
),


# Donors ------------------------------------------------------------------
# tar_target(
# donors_latest,
# prepare_donors_latest(
# path_data_prep_receipts,
# path_out_report_list,
# path_addresses = "data-out/addresses"
# )
# ),
# tar_target(donors_for_matching, prepare_donors_for_matching(donors_latest)),
# tar_target(donors_for_matching_sample, sample_frac(donors_for_matching, 0.1)),
# tar_target(
# donors_for_matching_sample_nc,
# donors_for_matching |> filter(grepl("^27", postal_code)) |> sample_n(10000)
# ),
#
# tar_target(
# donors_to_match,
# {
# # For testing, use a small sample of donors
# # slice_sample(donors_for_matching, n = 5000)
# donors_for_matching
# }
# ),
#
# # Build the EM matching model from a moderately sized sample
# tar_target(donors_em_model_sample, prepare_donors_matching_sample(donors_to_match)),
# tar_target(donors_em_model, fastlink_donors(donors_em_model_sample, estimate.only = TRUE)),
#
# # Then create blocks of data to match against
# tar_target(donor_blocks_zip_pre, prepare_donor_zip_blocks(donors_to_match)),
# tar_target(donor_blocks_city, prepare_donor_city_blocks(donors_to_match)),
# tar_target(donor_blocks_name, prepare_donor_name_blocks(donors_to_match)),
#
# # Then apply the pre-trained EM model to each block
# tar_target(
# donor_linked_zip_pre,
# fastlink_donor_blocks(donor_blocks_zip_pre, em.obj = donors_em_model)
# ),
# tar_target(
# donor_linked_city,
# fastlink_donor_blocks(donor_blocks_city, em.obj = donors_em_model)
# ),
# tar_target(
# donor_linked_name,
# fastlink_donor_blocks(donor_blocks_name, em.obj = donors_em_model)
# ),


# Candidates --------------------------------------------------------------
tar_target(
candidates_for_matching,
prepare_candidates_for_matching(path_data_prep_officers, path_out_report_list)
),
tar_target(
candidate_listing_for_matching,
prepare_candidate_listing_for_matching(candidate_listing)
),
tar_target(
candidates_linked,
fastlink_candidates(
candidates_for_matching,
candidate_listing_for_matching
)
),
tar_target(
candidates_matched,
fastlink_match_candidates(
candidates_for_matching,
candidate_listing_for_matching,
candidates_linked
)
),

# Candidate Listing -------------------------------------------------------
tar_target(
candidate_listing,
prep_candidate_listing(path_candidate_listing_raw)
),


# Output ------------------------------------------------------------------
tar_target(path_out_report_list, out_report_list(report_list), format = "file"),

tar_target(
path_out_addresses,
# This needs to be run manually, otherwise it doesn't run in {targets}
# and throws an error: `bad value`
out_addresses(path_addresses_db, "data/addresses.parquet"),
out_addresses(path_addresses_db, "data-out/addresses.parquet"),
cue = tar_cue("never"),
format = "file"
),

tar_target(
path_out_cover,
out_cover(path_data_prep_cover, path_out_report_list),
format = "file"
),

tar_target(
path_out_committees,
out_committees(path_out_cover),
format = "file"
),

tar_target(
path_out_expenses_payee,
out_expenses_payee(
path_data_prep_expenditures,
path_out_report_list
),
format = "file"
),

tar_target(
path_out_expenses,
out_expenses(
path_data_prep_expenditures,
path_out_expenses_payee,
path_out_report_list
),
format = "file"
),

tar_target(
path_out_receipts_payer,
out_receipts_payer(
path_data_prep_receipts,
path_out_report_list
),
format = "file"
),

tar_target(
path_out_receipts,
out_receipts(
path_data_prep_receipts,
path_out_receipts_payer,
path_out_report_list
),
format = "file"
),

tar_target(
path_out_elections,
out_write_parquet(candidate_listing$elections, "elections"),
format = "file"
),

tar_target(
path_out_elections_candidates,
out_write_parquet(candidate_listing$candidates, "elections_candidates"),
format = "file"
),

tar_target(
path_out_elections_candidates_alias,
out_write_parquet(candidate_listing$candidate_name_on_ballot, "elections_candidates_alias"),
format = "file"
),

tar_target(
path_out_elections_candidates_contact,
out_write_parquet(candidate_listing$candidate_contact, "elections_candidates_contact"),
format = "file"
),

tar_target(
path_out_elections_candidates_party,
out_write_parquet(candidate_listing$candidate_party, "elections_candidates_party"),
format = "file"
)

)

+ 104
- 18
process/_targets/meta/meta
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
파일 보기


+ 15
- 4
process/run.R 파일 보기

@@ -6,12 +6,13 @@
# to learn about your options.

'usage:
run.R all
run.R target <targets>... [--shortcut --reporter <reporter>]
run.R all [--reporter <reporter> --no-crew]
run.R target <targets>... [--shortcut --reporter <reporter> --no-crew --debug]
run.R -h | --help

options:
--reporter <reporter> Reporter type for `tar_make()` [default: verbose_positives].
--no-crew Disable crew?
-h --help Show this screen' -> doc

library(docopt)
@@ -20,15 +21,25 @@ opts <- docopt(doc)
Sys.setenv("IN_TARGETS" = "true")
Sys.setenv("ALLOW_DOWNLOADS" = "true")

if (opts$debug) {
str(opts)
stop()
}

if (is.null(opts$reporter)) {
opts$reporter <- "verbose_positives"
}

no_crew <- isTRUE(opts$no_crew)
if (no_crew) {
cli::cli_alert_info("Disabling {.field crew}.")
}

if (opts$all) {
cli::cli_alert_info("Running all targets.")
targets::tar_make(reporter = opts$reporter)
targets::tar_make(reporter = opts$reporter, use_crew = !no_crew)
} else {
cli::cli_alert_info("Running targets: {.and {.field {opts$targets}}}")
targets::tar_make(targets::any_of(!!opts$targets), shortcut = opts$shortcut, reporter = opts$reporter)
targets::tar_make(targets::any_of(!!opts$targets), shortcut = opts$shortcut, reporter = opts$reporter, use_crew = !no_crew)
}
# targets::tar_make_clustermq(workers = 2) # nolint

Loading…
취소
저장