|
- read_report_file <- function(report_path) {
- info <- report_path_info(report_path)
- lines <- brio::read_file(report_path)
-
- sections_raw <- strsplit(lines, "\n\r?\n")[[1]]
- sections_raw <- trimws(sections_raw)
- sections_raw <- sections_raw[nzchar(sections_raw)]
-
- sections <-
- sections_raw |>
- map(read_report_section, info = info, report_path = report_path)
-
- purrr::flatten(sections)
- }
-
- skip_report_sections <- function() {
- c(
- # receipts and expenditures are collected separately
- "receipts",
- "expenditures",
- # the debts tables have __problems__
- "debts_owed_to_the_committee",
- "debts_owed_by_the_committee"
- )
- }
-
- read_report_section <- function(section, info, report_path) {
- if (!grepl("^[A-Z ]+\n", section)) {
- # browser()
- stop("Expected a title at the start of a section")
- }
- title <- snakecase::to_snake_case(sub("\n.+", "", section))
-
- if (title %in% skip_report_sections()) {
- return(NULL)
- }
-
- #remove title
- body <- sub("^[A-Z ]+\n", "", section)
- header <- strsplit(body, "\n")[[1]][[1]]
- # # trailing commas should be on the previous line
- # body <- gsub("(\\w) ?\n,,,", "\\1,,,", body)
- # remove header
- body <- trimws(sub(header, "", body, fixed = TRUE))
- # body <- gsub("([^,]+ )\n([^,]+)", "\\1\\2", body)
-
- body <- pre_process_table_body(title, header, body)
-
- csv <- paste0(header, "\n", body)
- data <- read_csv(I(csv), show_col_types = FALSE, col_types = cols(.default = "c"))
-
- record_problems(data, label = title, path = report_path)
-
- if ("SBoE ID" %in% names(data)) {
- names(data)[which("SBoE ID" == names(data))] <- "sboe_id"
- }
- names(data) <- snakecase::to_snake_case(names(data), parsing_option = 3)
-
- post_process_steps_for_table(data, title)
-
- data <- mutate(data, !!!info, .before = 1)
- structure(list(data), names = title)
- }
-
- pre_process_table_body <- function(table, header, body) {
- if (table != "accounts") return(body)
-
- exp_commas <- stringr::str_count(header, ",")
-
- body_lines <- strsplit(body, "\n")[[1]]
- body_lines_no_quoted_fields <- gsub('("[^"]+")', "", body_lines)
-
- if (all(stringr::str_count(body_lines_no_quoted_fields, ",") == exp_commas)) {
- return(body)
- }
- browser()
-
- i <- 1
- while (i < length(body_lines)) {
- if (stringr::str_count(body_lines_no_quoted_fields[i], ",") >= exp_commas) {
- i <- i + 1
- next
- }
-
- body_lines[i] <- paste(body_lines[i], body_lines[i + 1], sep = " ")
- body_lines <- body_lines[-(i + 1)]
- body_lines_no_quoted_fields <- body_lines_no_quoted_fields[-(i + 1)]
- }
-
- paste(body_lines, collapse = "\n")
- }
-
- post_process_steps_for_table <- function(data, table) {
- switch(
- table,
- cover = ,
- accounts = distinct(data),
- forgiven_loans = ,
- loan_proceeds = filter(data, !is.na(amount)),
- data
- )
- }
-
- process_report_export <- function(dir_sboe_id, report_list = tar_read(report_list)) {
- all_exports <- dir_ls(dir_sboe_id, glob = "*.txt")
- info <- report_info_in_report_list(all_exports, report_list)
-
- all <-
- info$path |>
- map(read_report_file) |>
- list_transpose_bind() |>
- map(report_data_set_column_type)
-
- if (!"cover" %in% names(all)) {
- all$cover <- semi_join(report_list, info, by = "report_id") |> create_missing_cover()
- return(all)
- }
-
- missing_cover <-
- report_list |>
- semi_join(info, by = "report_id") |> # in this export group
- anti_join(all$cover, by = "report_id") # but doesn't have cover
-
- if (nrow(missing_cover) == 0) {
- return(all)
- }
-
- all$cover <- bind_rows(all$cover, create_missing_cover(missing_cover))
-
- all
- }
-
- report_data_set_column_type <- function(data) {
- maybe_numeric <- c("period", "cycle", "amount", "sum_to_date", "begin_balance", "end_balance")
-
- data |>
- mutate(
- across(
- c(matches("_date|date_"), -any_of(maybe_numeric)),
- lubridate::mdy
- ),
- across(any_of(maybe_numeric), parse_number)
- )
- }
-
- write_processed_report_export <- function(dir_sboe_id, report_list = tar_read(report_list)) {
- reports <- process_report_export(dir_sboe_id, report_list)
-
- sboe_id <- report_path_info(dir_sboe_id)$sboe_id
-
- base_dir <- here::here("..", "data")
- sboe_id_param <- sprintf("sboe_id=%s", sboe_id)
-
- return_path <- c()
-
- for (table in names(reports)) {
- path <- fs::path(base_dir, table, sboe_id_param, "part-0", ext = "parquet")
- dir_create(fs::path_dir(path))
-
- if (table == "cover") {
- return_path <- path
- }
-
- arrow::write_parquet(reports[[table]], path)
- }
-
- return_path
- }
-
- create_missing_cover <- function(missing_cover) {
- missing_cover |>
- mutate(report_type = paste(year, doc_name)) |>
- select(
- report_id,
- sboe_id,
- committee_name,
- report_type,
- date_from = start_date,
- date_to = end_date,
- date_filed = received
- ) |>
- mutate(
- street_1 = NA_character_, street_2 = NA_character_, city = NA_character_,
- state = NA_character_, zip_code = NA_character_, country = NA_character_,
- postal_code = NA_character_, committee_type = NA_character_,
- .before = report_type
- ) |>
- mutate(fund_type = NA_character_, fund_name = NA_character_, .after = report_type)
- }
|