read_report_file <- function(report_path) { info <- report_path_info(report_path) lines <- brio::read_file(report_path) sections_raw <- strsplit(lines, "\n\r?\n")[[1]] sections_raw <- trimws(sections_raw) sections_raw <- sections_raw[nzchar(sections_raw)] sections <- sections_raw |> map(read_report_section, info = info, report_path = report_path) purrr::flatten(sections) } skip_report_sections <- function() { c( # receipts and expenditures are collected separately "receipts", "expenditures", # the debts tables have __problems__ "debts_owed_to_the_committee", "debts_owed_by_the_committee" ) } read_report_section <- function(section, info, report_path) { if (!grepl("^[A-Z ]+\n", section)) { # browser() stop("Expected a title at the start of a section") } title <- snakecase::to_snake_case(sub("\n.+", "", section)) if (title %in% skip_report_sections()) { return(NULL) } #remove title body <- sub("^[A-Z ]+\n", "", section) header <- strsplit(body, "\n")[[1]][[1]] # # trailing commas should be on the previous line # body <- gsub("(\\w) ?\n,,,", "\\1,,,", body) # remove header body <- trimws(sub(header, "", body, fixed = TRUE)) # body <- gsub("([^,]+ )\n([^,]+)", "\\1\\2", body) body <- pre_process_table_body(title, header, body) csv <- paste0(header, "\n", body) data <- read_csv(I(csv), show_col_types = FALSE, col_types = cols(.default = "c")) record_problems(data, label = title, path = report_path) if ("SBoE ID" %in% names(data)) { names(data)[which("SBoE ID" == names(data))] <- "sboe_id" } names(data) <- snakecase::to_snake_case(names(data), parsing_option = 3) post_process_steps_for_table(data, title) data <- mutate(data, !!!info, .before = 1) structure(list(data), names = title) } pre_process_table_body <- function(table, header, body) { if (table != "accounts") return(body) exp_commas <- stringr::str_count(header, ",") body_lines <- strsplit(body, "\n")[[1]] body_lines_no_quoted_fields <- gsub('("[^"]+")', "", body_lines) if (all(stringr::str_count(body_lines_no_quoted_fields, ",") == exp_commas)) { return(body) } browser() i <- 1 while (i < length(body_lines)) { if (stringr::str_count(body_lines_no_quoted_fields[i], ",") >= exp_commas) { i <- i + 1 next } body_lines[i] <- paste(body_lines[i], body_lines[i + 1], sep = " ") body_lines <- body_lines[-(i + 1)] body_lines_no_quoted_fields <- body_lines_no_quoted_fields[-(i + 1)] } paste(body_lines, collapse = "\n") } post_process_steps_for_table <- function(data, table) { switch( table, forgiven_loans = , loan_proceeds = filter(data, !is.na(amount)), data ) } process_report_export <- function(dir_sboe_id, report_list = tar_read(report_list)) { all_exports <- dir_ls(dir_sboe_id, glob = "*.txt") info <- report_info_in_report_list(all_exports, report_list) info$path |> map(read_report_file) |> transpose() |> map(list_rbind) |> map(report_data_set_column_type) } report_data_set_column_type <- function(data) { maybe_numeric <- c("period", "cycle", "amount", "sum_to_date", "begin_balance", "end_balance") data |> mutate( across( c(matches("_date|date_"), -any_of(maybe_numeric)), lubridate::mdy ), across(any_of(maybe_numeric), parse_number) ) } write_processed_report_export <- function(dir_sboe_id, report_list = tar_read(report_list)) { reports <- process_report_export(dir_sboe_id, report_list) sboe_id <- report_path_info(dir_sboe_id)$sboe_id base_dir <- here::here("..", "data") sboe_id_param <- sprintf("sboe_id=%s", sboe_id) return_path <- c() for (table in names(reports)) { path <- fs::path(base_dir, table, sboe_id_param, "part-0", ext = "parquet") dir_create(fs::path_dir(path)) if (table == "cover") { return_path <- path } arrow::write_parquet(reports[[table]], path) } return_path }