read_report_file <- function(report_path) { info <- report_path_info(report_path) lines <- brio::read_file(report_path) sections_raw <- strsplit(lines, "\n\r?\n")[[1]] sections_raw <- trimws(sections_raw) sections_raw <- sections_raw[nzchar(sections_raw)] sections <- sections_raw |> map(read_report_section, info = info, report_path = report_path) purrr::flatten(sections) } skip_report_sections <- function() { c( # receipts and expenditures are collected separately "receipts", "expenditures", # the debts tables have __problems__ "debts_owed_to_the_committee", "debts_owed_by_the_committee" ) } read_report_section <- function(section, info, report_path) { if (!grepl("^[A-Z ]+\n", section)) { # browser() stop("Expected a title at the start of a section") } title <- snakecase::to_snake_case(sub("\n.+", "", section)) if (title %in% skip_report_sections()) { return(NULL) } #remove title body <- sub("^[A-Z ]+\n", "", section) header <- strsplit(body, "\n")[[1]][[1]] # # trailing commas should be on the previous line # body <- gsub("(\\w) ?\n,,,", "\\1,,,", body) # remove header body <- trimws(sub(header, "", body, fixed = TRUE)) # body <- gsub("([^,]+ )\n([^,]+)", "\\1\\2", body) body <- pre_process_table_body(title, header, body) csv <- paste0(header, "\n", body) data <- read_csv(I(csv), show_col_types = FALSE, col_types = cols(.default = "c")) record_problems(data, label = title, path = report_path) if ("SBoE ID" %in% names(data)) { names(data)[which("SBoE ID" == names(data))] <- "sboe_id" } names(data) <- snakecase::to_snake_case(names(data), parsing_option = 3) data <- post_process_steps_for_table(data, title) data <- mutate(data, !!!info, .before = 1) structure(list(data), names = title) } pre_process_table_body <- function(table, header, body) { if (table != "accounts") return(body) exp_commas <- stringr::str_count(header, ",") body_lines <- strsplit(body, "\n")[[1]] body_lines_no_quoted_fields <- gsub('("[^"]+")', "", body_lines) if (all(stringr::str_count(body_lines_no_quoted_fields, ",") == exp_commas)) { return(body) } i <- 1 while (i < length(body_lines)) { if (stringr::str_count(body_lines_no_quoted_fields[i], ",") >= exp_commas) { i <- i + 1 next } body_lines[i] <- paste(body_lines[i], body_lines[i + 1], sep = " ") body_lines <- body_lines[-(i + 1)] body_lines_no_quoted_fields <- body_lines_no_quoted_fields[-(i + 1)] } paste(body_lines, collapse = "\n") } post_process_steps_for_table <- function(data, table) { switch( table, cover = , accounts = distinct(data), forgiven_loans = , loan_proceeds = filter(data, !is.na(amount)), data ) } process_report_export <- function(dir_sboe_id, report_list = tar_read(report_list)) { all_exports <- dir_ls(dir_sboe_id, glob = "*.txt") info <- report_info_in_report_list(all_exports, report_list) all <- info$path |> map(read_report_file) |> list_transpose_bind() |> map(report_data_set_column_type) if (!"cover" %in% names(all)) { all$cover <- semi_join(report_list, info, by = "report_id") |> create_missing_cover() return(all) } missing_cover <- report_list |> semi_join(info, by = "report_id") |> # in this export group anti_join(all$cover, by = "report_id") # but doesn't have cover if (nrow(missing_cover) == 0) { return(all) } all$cover <- bind_rows(all$cover, create_missing_cover(missing_cover)) all } report_data_set_column_type <- function(data) { maybe_numeric <- c("period", "cycle", "amount", "sum_to_date", "begin_balance", "end_balance") data |> mutate( across( c(matches("_date|date_"), -any_of(maybe_numeric)), lubridate::mdy ), across(any_of(maybe_numeric), parse_number) ) } write_processed_report_export <- function(dir_sboe_id, report_list = tar_read(report_list)) { reports <- process_report_export(dir_sboe_id, report_list) info <- report_path_info(dir_sboe_id) if (!any(info$sboe_id == "No Id") && length(unique(info$sboe_id)) == 1) { return(write_processed_report_export_parquet(reports, unique(info$sboe_id))) } sboe_ids <- map_dfr(reports, select, sboe_id) |> pull(sboe_id) |> unique() reports <- sboe_ids |> set_names() |> map(\(id) map(reports, \(d) filter(d, sboe_id == id))) map(sboe_ids, \(id) write_processed_report_export_parquet(reports[[id]], id)) |> flatten_chr() |> unname() } write_processed_report_export_parquet <- function(reports, sboe_id) { base_dir <- here::here("..", "data") sboe_id_param <- sprintf("sboe_id=%s", sboe_id) return_path <- c() for (table in names(reports)) { path <- fs::path(base_dir, table, sboe_id_param, "part-0", ext = "parquet") dir_create(fs::path_dir(path)) if (table == "cover") { return_path <- path } arrow::write_parquet(reports[[table]], path) } return_path } create_missing_cover <- function(missing_cover) { missing_cover |> mutate(report_type = paste(year, doc_name)) |> select( report_id, sboe_id, committee_name, report_type, date_from = start_date, date_to = end_date, date_filed = received ) |> mutate( street_1 = NA_character_, street_2 = NA_character_, city = NA_character_, state = NA_character_, zip_code = NA_character_, country = NA_character_, postal_code = NA_character_, committee_type = NA_character_, .before = report_type ) |> mutate(fund_type = NA_character_, fund_name = NA_character_, .after = report_type) }