You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

157 lines
4.2KB

  1. #' @export
  2. save_tweets <- function(
  3. tweets,
  4. file = getOption("gathertweet.file", "tweets.rds"),
  5. save_fun = saveRDS,
  6. read_fun = read_tweets,
  7. lck = NULL
  8. ) {
  9. if (nrow(tweets) < 1) return(tweets)
  10. fs::dir_create(fs::path_dir(file))
  11. if (is.null(lck)) {
  12. lck <- exclusive_lock(file)
  13. on.exit(unlock(lck))
  14. }
  15. stopifnot_locked(lck, message = "Unable to acquire lock on {file}")
  16. if (fs::file_exists(file)) {
  17. # Don't drop or lose old tweets
  18. tweets_prev <- read_fun(file, lck = lck)
  19. status_not_new <- setdiff(tweets_prev$status_id, tweets$status_id)
  20. if (length(status_not_new)) {
  21. tweets <- rbind(
  22. tweets,
  23. tweets_prev[tweets_prev$status_id %in% status_not_new, ]
  24. )
  25. }
  26. stopifnot(length(setdiff(tweets_prev$status_id, tweets$status_id)) == 0)
  27. }
  28. save_fun(tweets, file)
  29. tweets
  30. }
  31. #' @export
  32. last_seen_tweet <- function(
  33. tweets = NULL,
  34. file = getOption("gathertweet.file", "tweets.rds")
  35. ) {
  36. if (is.null(tweets)) tweets <- read_tweets(file)
  37. if (is.null(tweets)) return(NULL)
  38. tweets$status_id %>%
  39. as.numeric() %>%
  40. max() %>%
  41. as.character()
  42. }
  43. #' @export
  44. read_tweets <- function(
  45. file = getOption("gathertweet.file", "tweets.rds"),
  46. lck = NULL
  47. ) {
  48. if (!file_exists(file)) return(NULL)
  49. if (is.null(lck)) {
  50. lck <- shared_lock(file)
  51. on.exit(unlock(lck))
  52. }
  53. stopifnot_locked(lck, message = "Unable to acquire lock on {file}")
  54. readRDS(file)
  55. }
  56. #' @export
  57. backup_tweets <- function(
  58. file = getOption("gathertweet.file", "tweets.rds"),
  59. lck = NULL
  60. ) {
  61. if (!file_exists(file)) return()
  62. if (is.null(lck)) {
  63. lck <- shared_lock(file)
  64. on.exit(unlock(lck))
  65. }
  66. stopifnot_locked(lck, message = "Unable to acquire lock on {file}")
  67. file_backup <- path_add(file)
  68. log_info("Backing up tweet file to {file_backup}")
  69. fs::file_copy(file, file_backup)
  70. }
  71. #' @export
  72. update_tweets <- function(
  73. tweets = NULL,
  74. file = getOption("tweets.file", "tweets.rds"),
  75. ...
  76. ) {
  77. if (is.null(tweets)) tweets <- read_tweets(file)
  78. lookup_status_ratelimit(tweets$status_id, ...)
  79. }
  80. lookup_status_ratelimit <- function(status_id, ...) {
  81. tweets <- NULL
  82. rate_limit <- rtweet::rate_limits(query = "statuses/lookup")
  83. fetch_count <- 0
  84. n_status <- length(status_id)
  85. n_status_large <- n_status > 90000
  86. for (idx_group in seq(1, ceiling(n_status/90000))) {
  87. # Rate limit ----
  88. # Track rate limit and wait it out if needed
  89. if (Sys.time() > rate_limit$reset_at) {
  90. log_debug("Updating out-of-date rate limit")
  91. rate_limit <- rtweet::rate_limits(query = "statuses/lookup")
  92. }
  93. if (rate_limit$remaining - fetch_count < 1) {
  94. # wait until rate limit resets
  95. wait_s <- difftime(Sys.time(), rate_limit$reset_at, units = "sec")
  96. log_info("Waiting for rate limit to reset at {rate_limit$reset_at}")
  97. Sys.sleep(ceiling(as.numeric(wait_s)))
  98. }
  99. if (fetch_count > 0 && fetch_count %% 50 == 0) {
  100. rate_limit <- rtweet::rate_limits(query = "statuses/lookup")
  101. }
  102. # Get Statuses ----
  103. if (n_status_large) {
  104. idx_start <- (idx_group - 1) * 90000 + 1
  105. idx_end <- min(idx_group * 90000, n_status)
  106. log_info("Getting tweets {idx_start} to {idx_end} of {n_status}")
  107. } else {
  108. idx_start <- 1
  109. idx_end <- n_status
  110. log_info("Getting {n_status} tweets")
  111. }
  112. tweets <- rbind(
  113. tweets,
  114. rtweet::lookup_statuses(status_id[idx_start:idx_end])
  115. )
  116. }
  117. tweets
  118. }
  119. path_lock <- function(file) {
  120. path(path_add(file, NULL, prepend = "."), ext = "lock")
  121. }
  122. path_add <- function(file, append = strftime(Sys.time(), "_%F_%H%M%S"), prepend = NULL) {
  123. if (is.null(append)) append <- ""
  124. if (is.null(prepend)) prepend <- ""
  125. file_base <- fs::path_ext_remove(fs::path_file(file))
  126. file_ext <- fs::path_ext(file)
  127. file_dir <- fs::path_dir(file)
  128. path(file_dir,
  129. glue::glue("{prepend}{file_base}{append}"),
  130. ext = file_ext)
  131. }
  132. stopifnot_locked <- function(lck = NULL, message = "Unable to aquire lock") {
  133. if (!is.null(lck)) return(invisible(TRUE))
  134. log_error(message, envir = sys.frame(1))
  135. }
  136. shared_lock <- function(file, timeout = 1 * 60 * 1000) {
  137. lock(path_lock(file), exclusive = FALSE, timeout = timeout)
  138. }
  139. exclusive_lock <- function(file, timeout = 1 * 60 * 1000) {
  140. lock(path_lock(file), exclusive = TRUE, timeout = timeout)
  141. }