process_receipts_csv <- function(dir_sboe_id, report_list = tar_read(report_list)) { # Read the files in the directory, extract report_id from the path # Compare to report_list to determine which reports go into the data files <- dir_ls(dir_sboe_id) info <- report_path_info(files) info$path <- files # These are the reports we want to keep in the data info <- semi_join(info, report_list, by = c("sboe_id", "report_id")) receipts <- info |> pmap(function(sboe_id, report_id, path, ...) { if (file_size(path) < 1) { return(NULL) } read_receipts_csv(path, sboe_id, report_id) }) |> list_rbind() names(receipts) <- snakecase::to_snake_case(names(receipts), parsing_option = 3) receipts } write_receipts_parquet <- function(dir_sboe_id, report_list = tar_read(report_list)) { receipts <- process_receipts_csv(dir_sboe_id, report_list) info <- report_path_info(dir_sboe_id) data_dir <- here::here("..", "data", "receipts", sprintf("sboe_id=%s", info$sboe_id)) data_path <- path(data_dir, "part-0.parquet") dir_create(data_dir) arrow::write_parquet(receipts, data_path) data_path } read_receipts_csv <- function(path, sboe_id = NULL, report_id = NULL) { if (is.null(sboe_id) || is.null(report_id)) { info <- report_path_info(path) sboe_id <- info$sboe_id report_id <- info$report_id } x <- read_csv( path, col_types = cols( .default = col_character(), GroupID = col_integer(), IsOrg = col_logical(), IsUS = col_logical(), Amount = col_double(), SumToDate = col_double(), IsAggregated = col_logical(), IsPrior = col_character() ) ) record_problems(x, label = "receipts") x |> mutate(IsPrior = IsPrior == "X") |> mutate(sboe_id = sboe_id, report_id = report_id, .before = 0) }