소스 검색

process raw data in a new project

main
Garrick Aden-Buie 2 년 전
부모
커밋
8593c43fbe
No known key found for this signature in database
13개의 변경된 파일9670개의 추가작업 그리고 16개의 파일을 삭제
  1. +1
    -0
      .gitignore
  2. +32
    -0
      process/DESCRIPTION
  3. +65
    -0
      process/R/process_expenditures_csv.R
  4. +66
    -0
      process/R/process_receipts_csv.R
  5. +13
    -0
      process/R/process_report_list.R
  6. +0
    -16
      process/R/read_report_file.R
  7. +11
    -0
      process/R/record_problems.R
  8. +11
    -0
      process/R/report_path_info.R
  9. +54
    -0
      process/_targets.R
  10. +5
    -0
      process/_targets/.gitignore
  11. +9361
    -0
      process/_targets/meta/meta
  12. +22
    -0
      process/nc-campaign-finance_process.Rproj
  13. +29
    -0
      process/run.R

+ 1
- 0
.gitignore 파일 보기

@@ -7,3 +7,4 @@
data-raw
collect/data-raw/
collect/data-old/
data/

+ 32
- 0
process/DESCRIPTION 파일 보기

@@ -0,0 +1,32 @@
Type: Project
Package: nc-campaign-finance-process
Title: Process the NC Campaign Finance Data
Version: 0.0.0.9000
Authors@R:
person("Garrick", "Aden-Buie", , "garrick@adenbuie.com", role = c("aut", "cre"),
comment = c(ORCID = "0000-0002-7111-0077"))
Description: Proces the NC Campaign Finance Data.
License: Proprietary
Encoding: UTF-8
Roxygen: list(markdown = TRUE)
RoxygenNote: 7.2.3
Depends:
dplyr,
fs,
glue,
httr2,
purrr,
readr,
rlang,
targets
Imports:
arrow,
cli,
crew,
DBI,
dbplyr,
desc,
docopt,
duckdb,
here,
visNetwork

+ 65
- 0
process/R/process_expenditures_csv.R 파일 보기

@@ -0,0 +1,65 @@
process_expenditures_csv <- function(dir_sboe_id, report_list = tar_read(report_list)) {
# Read the files in the directory, extract report_id from the path
# Compare to report_list to determine which reports go into the data

files <- dir_ls(dir_sboe_id)
info <- report_path_info(files)
info$path <- files

# These are the reports we want to keep in the data
info <- semi_join(info, report_list, by = c("sboe_id", "report_id"))

expenditures <-
info |>
pmap(function(sboe_id, report_id, path, ...) {
read_expenditures_csv(path, sboe_id, report_id)
}) |>
list_rbind()

names(expenditures) <- snakecase::to_snake_case(names(expenditures), parsing_option = 3)

expenditures
}

write_expenditures_parquet <- function(dir_sboe_id, report_list = tar_read(report_list)) {
expenditures <- process_expenditures_csv(dir_sboe_id, report_list)

info <- report_path_info(dir_sboe_id)

data_dir <- here::here("..", "data", "expenditures", sprintf("sboe_id=%s", info$sboe_id))
data_path <- path(data_dir, "part-0.parquet")
dir_create(data_dir)

arrow::write_parquet(expenditures, data_path)
data_path
}

read_expenditures_csv <- function(path, sboe_id = NULL, report_id = NULL) {
if (file_size(path) < 1) {
return(NULL)
}

if (is.null(sboe_id) || is.null(report_id)) {
info <- report_path_info(path)
sboe_id <- info$sboe_id
report_id <- info$report_id
}

x <- read_csv(
path,
col_types = cols(
.default = col_character(),
OccurDate = col_date("%m/%d/%Y"),
IsOrg = col_logical(),
IsUS = col_logical(),
Amount = col_double(),
SumToDate = col_double(),
IsAggregated = col_logical()
)
)

record_problems(x, label = "expenditures")

x |>
mutate(sboe_id = sboe_id, report_id = report_id, .before = 0)
}

+ 66
- 0
process/R/process_receipts_csv.R 파일 보기

@@ -0,0 +1,66 @@
process_receipts_csv <- function(dir_sboe_id, report_list = tar_read(report_list)) {
# Read the files in the directory, extract report_id from the path
# Compare to report_list to determine which reports go into the data

files <- dir_ls(dir_sboe_id)
info <- report_path_info(files)
info$path <- files

# These are the reports we want to keep in the data
info <- semi_join(info, report_list, by = c("sboe_id", "report_id"))

receipts <-
info |>
pmap(function(sboe_id, report_id, path, ...) {
if (file_size(path) < 1) {
return(NULL)
}
read_receipts_csv(path, sboe_id, report_id)
}) |>
list_rbind()

names(receipts) <- snakecase::to_snake_case(names(receipts), parsing_option = 3)

receipts
}

write_receipts_parquet <- function(dir_sboe_id, report_list = tar_read(report_list)) {
receipts <- process_receipts_csv(dir_sboe_id, report_list)

info <- report_path_info(dir_sboe_id)

data_dir <- here::here("..", "data", "receipts", sprintf("sboe_id=%s", info$sboe_id))
data_path <- path(data_dir, "part-0.parquet")
dir_create(data_dir)

arrow::write_parquet(receipts, data_path)
data_path
}

read_receipts_csv <- function(path, sboe_id = NULL, report_id = NULL) {
if (is.null(sboe_id) || is.null(report_id)) {
info <- report_path_info(path)
sboe_id <- info$sboe_id
report_id <- info$report_id
}

x <- read_csv(
path,
col_types = cols(
.default = col_character(),
GroupID = col_integer(),
IsOrg = col_logical(),
IsUS = col_logical(),
Amount = col_double(),
SumToDate = col_double(),
IsAggregated = col_logical(),
IsPrior = col_character()
)
)

record_problems(x, label = "receipts")

x |>
mutate(IsPrior = IsPrior == "X") |>
mutate(sboe_id = sboe_id, report_id = report_id, .before = 0)
}

+ 13
- 0
process/R/process_report_list.R 파일 보기

@@ -0,0 +1,13 @@
process_report_list <- function(path_report_list) {
out <- path("..", "data", "report_list", "part-0.parquet")
dir_create(path_dir(out))

read_csv(
path_report_list,
col_types = cols(
year = col_integer(),
report_id = col_integer()
)
) |>
arrow::write_parquet(out)
}

collect/R/read_report_file.R → process/R/read_report_file.R 파일 보기

@@ -98,19 +98,3 @@ write_reports_by_sboe_id <- function(sboe_id) {

return_path
}

report_path_info <- function(report_path) {
# data-raw/reports/{sboe_id}/all/{report_id}_{received_date}.txt
report_path <- fs::path_abs(report_path)
x <- fs::path_rel(report_path, here::here("data-raw", "reports"))
x <- fs::path_split(x)[[1]]

# x[2] is "all"

id <- strsplit(x[3], "_")[[1]][1]

list(
sboe_id = x[1],
report_id = as.integer(id)
)
}

+ 11
- 0
process/R/record_problems.R 파일 보기

@@ -0,0 +1,11 @@
record_problems <- function(x, path, label) {
if (nrow(problems(x)) == 0) {
return(invisible())
}

problem_dir <- here::here("..", "problems")
problem_path <- path(problem_dir, label, ext = "csv")
dir_create(problem_dir)

write_csv(problems(x), problem_path, append = file_exists(problem_path))
}

+ 11
- 0
process/R/report_path_info.R 파일 보기

@@ -0,0 +1,11 @@
report_path_info <- function(report_path) {
# data-raw/reports/{sboe_id}/all/{report_id}_{received_date}.txt
report_path <- fs::path_abs(report_path)
x <- fs::path_rel(report_path, here::here("..", "data-raw", "reports"))
x <- map(x, path_split) |> list_flatten()

tibble(
sboe_id = map_vec(x, \(x) x[1]),
report_id = map_int(x, \(x) as.integer(strsplit(x[3], "_")[[1]][1])),
)
}

+ 54
- 0
process/_targets.R 파일 보기

@@ -0,0 +1,54 @@
# Created by use_targets().
# Follow the comments below to fill in this target script.
# Then follow the manual to check and run the pipeline:
# https://books.ropensci.org/targets/walkthrough.html#inspect-the-pipeline

# Load packages required to define the pipeline:
library(targets)

# Set target options:
tar_option_set(
packages = strsplit(desc::desc_get_field("Depends"), ", ")[[1]],
# For distributed computing in tar_make(), supply a {crew} controller
# as discussed at https://books.ropensci.org/targets/crew.html.
controller = crew::crew_controller_local(workers = 12),
# debug = "path_receipts_parquet_8d195f7e",
# cue = tar_cue(mode = "never")
error = "stop"
)

# Run the R scripts in the R/ folder with your custom functions:
tar_source()

# Replace the target list below with your own:
list(
tar_target(path_report_list, "../data-raw/report_list.csv", format = "file"),
tar_target(report_list, process_report_list(path_report_list)),

tar_target(
dirs_all,
fs::dir_ls("../data-raw/reports", glob = "**/all", recurse = TRUE, type = "directory")
),
tar_target(
dirs_receipts,
fs::dir_ls("../data-raw/reports", glob = "**/receipts", recurse = TRUE, type = "directory")
),
tar_target(
dirs_expenditures,
fs::dir_ls("../data-raw/reports", glob = "**/expenditures", recurse = TRUE, type = "directory")
),

tar_target(
path_receipts_parquet,
write_receipts_parquet(dirs_receipts, report_list),
pattern = map(dirs_receipts),
format = "file"
),

tar_target(
path_expenditures_parquet,
write_expenditures_parquet(dirs_expenditures, report_list),
pattern = map(dirs_expenditures),
format = "file"
)
)

+ 5
- 0
process/_targets/.gitignore 파일 보기

@@ -0,0 +1,5 @@
*
!.gitignore
!meta
meta/*
!meta/meta

+ 9361
- 0
process/_targets/meta/meta
파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
파일 보기


+ 22
- 0
process/nc-campaign-finance_process.Rproj 파일 보기

@@ -0,0 +1,22 @@
Version: 1.0

RestoreWorkspace: No
SaveWorkspace: No
AlwaysSaveHistory: Default

EnableCodeIndexing: Yes
UseSpacesForTab: Yes
NumSpacesForTab: 2
Encoding: UTF-8

RnwWeave: Sweave
LaTeX: pdfLaTeX

AutoAppendNewline: Yes
StripTrailingWhitespace: Yes
LineEndingConversion: Posix

BuildType: Package
PackageUseDevtools: Yes
PackageInstallArgs: --no-multiarch --with-keep.source
PackageRoxygenize: rd,collate,namespace

+ 29
- 0
process/run.R 파일 보기

@@ -0,0 +1,29 @@
#!/usr/bin/env Rscript

# This is a helper script to run the pipeline.
# Choose how to execute the pipeline below.
# See https://books.ropensci.org/targets/hpc.html
# to learn about your options.

'usage:
run.R all
run.R target <targets>...
run.R -h | --help

options:
-h --help Show this screen' -> doc

library(docopt)
opts <- docopt(doc)

Sys.setenv("IN_TARGETS" = "true")
Sys.setenv("ALLOW_DOWNLOADS" = "true")

if (opts$all) {
cli::cli_alert_info("Running all targets.")
targets::tar_make()
} else {
cli::cli_alert_info("Running targets: {.and {.field {opts$targets}}}")
targets::tar_make(targets::any_of(!!opts$targets))
}
# targets::tar_make_clustermq(workers = 2) # nolint

Loading…
취소
저장