No puede seleccionar más de 25 temas Los temas deben comenzar con una letra o número, pueden incluir guiones ('-') y pueden tener hasta 35 caracteres de largo.

158 líneas
4.3KB

  1. #' @export
  2. save_tweets <- function(
  3. tweets,
  4. file = getOption("gathertweet.file", "tweets.rds"),
  5. save_fun = saveRDS,
  6. read_fun = read_tweets,
  7. lck = NULL
  8. ) {
  9. if (nrow(tweets) < 1) return(tweets)
  10. fs::dir_create(fs::path_dir(file))
  11. if (is.null(lck)) {
  12. lck <- exclusive_lock(file)
  13. on.exit(unlock(lck))
  14. }
  15. stopifnot_locked(lck, message = "Unable to acquire lock on {file}")
  16. if (fs::file_exists(file)) {
  17. # Don't drop or lose old tweets
  18. tweets_prev <- read_fun(file, lck = lck)
  19. if (!is.null(tweets_prev)) {
  20. tweets_not_new <- anti_join(tweets_prev, tweets, by = "status_id")
  21. if (nrow(tweets_not_new)) {
  22. tweets <- bind_rows(tweets, tweets_not_new)
  23. }
  24. if (length(setdiff(tweets_prev$status_id, tweets$status_id)) != 0) {
  25. log_fatal("An error occurred that would have lost stored tweets")
  26. }
  27. }
  28. }
  29. save_fun(tweets, file)
  30. tweets
  31. }
  32. #' @export
  33. last_seen_tweet <- function(
  34. tweets = NULL,
  35. file = getOption("gathertweet.file", "tweets.rds")
  36. ) {
  37. if (is.null(tweets)) tweets <- read_tweets(file)
  38. if (is.null(tweets)) return(NULL)
  39. tweets$status_id %>%
  40. as.numeric() %>%
  41. max() %>%
  42. as.character()
  43. }
  44. #' @export
  45. read_tweets <- function(
  46. file = getOption("gathertweet.file", "tweets.rds"),
  47. lck = NULL
  48. ) {
  49. if (!file_exists(file)) return(NULL)
  50. if (is.null(lck)) {
  51. lck <- shared_lock(file)
  52. on.exit(unlock(lck))
  53. }
  54. stopifnot_locked(lck, message = "Unable to acquire lock on {file}")
  55. readRDS(file)
  56. }
  57. #' @export
  58. backup_tweets <- function(
  59. file = getOption("gathertweet.file", "tweets.rds"),
  60. lck = NULL
  61. ) {
  62. if (!file_exists(file)) return()
  63. if (is.null(lck)) {
  64. lck <- shared_lock(file)
  65. on.exit(unlock(lck))
  66. }
  67. stopifnot_locked(lck, message = "Unable to acquire lock on {file}")
  68. file_backup <- path_add(file)
  69. log_info("Backing up tweet file to {file_backup}")
  70. fs::file_copy(file, file_backup)
  71. }
  72. #' @export
  73. update_tweets <- function(
  74. tweets = NULL,
  75. file = getOption("tweets.file", "tweets.rds"),
  76. ...
  77. ) {
  78. if (is.null(tweets)) tweets <- read_tweets(file)
  79. lookup_status_ratelimit(tweets$status_id, ...)
  80. }
  81. lookup_status_ratelimit <- function(status_id, ...) {
  82. tweets <- NULL
  83. rate_limit <- rtweet::rate_limits(query = "statuses/lookup")
  84. fetch_count <- 0
  85. n_status <- length(status_id)
  86. n_status_large <- n_status > 90000
  87. for (idx_group in seq(1, ceiling(n_status/90000))) {
  88. # Rate limit ----
  89. # Track rate limit and wait it out if needed
  90. if (Sys.time() > rate_limit$reset_at) {
  91. log_debug("Updating out-of-date rate limit")
  92. rate_limit <- rtweet::rate_limits(query = "statuses/lookup")
  93. }
  94. if (rate_limit$remaining - fetch_count < 1) {
  95. # wait until rate limit resets
  96. wait_s <- difftime(Sys.time(), rate_limit$reset_at, units = "sec")
  97. log_info("Waiting for rate limit to reset at {rate_limit$reset_at}")
  98. Sys.sleep(ceiling(as.numeric(wait_s)))
  99. }
  100. if (fetch_count > 0 && fetch_count %% 50 == 0) {
  101. rate_limit <- rtweet::rate_limits(query = "statuses/lookup")
  102. }
  103. # Get Statuses ----
  104. if (n_status_large) {
  105. idx_start <- (idx_group - 1) * 90000 + 1
  106. idx_end <- min(idx_group * 90000, n_status)
  107. log_info("Getting tweets {idx_start} to {idx_end} of {n_status}")
  108. } else {
  109. idx_start <- 1
  110. idx_end <- n_status
  111. log_info("Getting {n_status} tweets")
  112. }
  113. tweets <- bind_rows(
  114. tweets,
  115. rtweet::lookup_statuses(status_id[idx_start:idx_end], ...)
  116. )
  117. }
  118. tweets
  119. }
  120. path_lock <- function(file) {
  121. path(path_add(file, NULL, prepend = "."), ext = "lock")
  122. }
  123. path_add <- function(file, append = strftime(Sys.time(), "_%F_%H%M%S"), prepend = NULL) {
  124. if (is.null(append)) append <- ""
  125. if (is.null(prepend)) prepend <- ""
  126. file_base <- fs::path_ext_remove(fs::path_file(file))
  127. file_ext <- fs::path_ext(file)
  128. file_dir <- fs::path_dir(file)
  129. path(file_dir,
  130. glue::glue("{prepend}{file_base}{append}"),
  131. ext = file_ext)
  132. }
  133. stopifnot_locked <- function(lck = NULL, message = "Unable to aquire lock") {
  134. if (!is.null(lck)) return(invisible(TRUE))
  135. log_error(message, envir = sys.frame(1))
  136. }
  137. shared_lock <- function(file, timeout = 1 * 60 * 1000) {
  138. lock(path_lock(file), exclusive = FALSE, timeout = timeout)
  139. }
  140. exclusive_lock <- function(file, timeout = 1 * 60 * 1000) {
  141. lock(path_lock(file), exclusive = TRUE, timeout = timeout)
  142. }