You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

249 lines
6.7KB

  1. #' @export
  2. save_tweets <- function(
  3. tweets,
  4. file = getOption("gathertweet.file", "tweets.rds"),
  5. save_fun = saveRDS,
  6. read_fun = read_tweets,
  7. lck = NULL,
  8. key_var = "status_id"
  9. ) {
  10. if (nrow(tweets) < 1) return(tweets)
  11. fs::dir_create(fs::path_dir(file))
  12. if (is.null(lck)) {
  13. lck <- exclusive_lock(file)
  14. on.exit(unlock(lck))
  15. }
  16. stopifnot_locked(lck, message = "Unable to acquire lock on {file}")
  17. if (fs::file_exists(file)) {
  18. # Don't drop or lose old tweets
  19. tweets_prev <- read_fun(file, lck = lck)
  20. if (!is.null(tweets_prev)) {
  21. tweets_not_new <- anti_join(tweets_prev, tweets, by = key_var)
  22. if (nrow(tweets_not_new)) {
  23. tweets <- bind_rows(tweets, tweets_not_new)
  24. }
  25. if (length(setdiff(tweets_prev[[key_var]], tweets[[key_var]])) != 0) {
  26. log_fatal("An error occurred that would have lost stored tweets")
  27. }
  28. }
  29. }
  30. save_fun(tweets, file)
  31. tweets
  32. }
  33. #' @export
  34. last_seen_tweet <- function(
  35. tweets = NULL,
  36. file = getOption("gathertweet.file", "tweets.rds")
  37. ) {
  38. if (is.null(tweets)) tweets <- read_tweets(file)
  39. if (is.null(tweets)) return(NULL)
  40. tweets$status_id %>%
  41. as.numeric() %>%
  42. max() %>%
  43. as.character()
  44. }
  45. #' @export
  46. read_tweets <- function(
  47. file = getOption("gathertweet.file", "tweets.rds"),
  48. lck = NULL
  49. ) {
  50. if (!file_exists(file)) return(NULL)
  51. if (is.null(lck)) {
  52. lck <- shared_lock(file)
  53. on.exit(unlock(lck))
  54. }
  55. stopifnot_locked(lck, message = "Unable to acquire lock on {file}")
  56. readRDS(file)
  57. }
  58. #' @export
  59. backup_tweets <- function(
  60. file = getOption("gathertweet.file", "tweets.rds"),
  61. backup_dir = "backups",
  62. lck = NULL
  63. ) {
  64. if (!file_exists(file)) return()
  65. if (is.null(lck)) {
  66. lck <- shared_lock(file)
  67. on.exit(unlock(lck))
  68. }
  69. stopifnot_locked(lck, message = "Unable to acquire lock on {file}")
  70. file_backup <- path(fs::path_dir(file), backup_dir, fs::path_file(file))
  71. file_backup <- path_add(file_backup)
  72. fs::dir_create(fs::path_dir(file_backup))
  73. log_info("Backing up tweet file to {file_backup}")
  74. fs::file_copy(file, file_backup)
  75. }
  76. #' @export
  77. simplify_tweets <- function(
  78. tweets = NULL,
  79. file = getOption("gathertweet.file", "tweets.rds"),
  80. ...,
  81. .fields = NULL
  82. ) {
  83. if (is.null(tweets)) tweets <- read_tweets(file)
  84. if (is.null(tweets)) return(NULL)
  85. .fields <- c(list(...), .fields)
  86. if (length(.fields)) {
  87. tweets %>% dplyr::select(!!!.fields)
  88. } else {
  89. tw_cols <- names(tweets)
  90. keep_cols <- c(
  91. "created_at",
  92. "status_id",
  93. "user_id",
  94. "screen_name",
  95. "text",
  96. "is_quote",
  97. "is_retweet",
  98. "favorite_count",
  99. "retweet_count",
  100. "hashtags",
  101. "profile_url",
  102. "profile_image_url",
  103. "urls_expanded_url",
  104. "mentions_screen_name",
  105. "media_url",
  106. "urls_url",
  107. "ext_media_url",
  108. "status_url",
  109. grep("^reply_to_", tw_cols, value = TRUE),
  110. "quoted_status_id",
  111. "retweet_status_id",
  112. "quoted_status_id",
  113. grep("^mentions_", tw_cols, value = TRUE)
  114. )
  115. tweets[, intersect(tw_cols, keep_cols)]
  116. }
  117. }
  118. #' @export
  119. update_tweets <- function(
  120. tweets = NULL,
  121. file = getOption("tweets.file", "tweets.rds"),
  122. ...
  123. ) {
  124. if (is.null(tweets)) tweets <- read_tweets(file)
  125. lookup_status_ratelimit(tweets$status_id, ...)
  126. }
  127. lookup_status_ratelimit <- function(status_id, ...) {
  128. tweets <- NULL
  129. rate_limit <- rtweet::rate_limits(query = "statuses/lookup")
  130. fetch_count <- 0
  131. n_status <- length(status_id)
  132. n_status_large <- n_status > 90000
  133. for (idx_group in seq(1, ceiling(n_status/90000))) {
  134. # Rate limit ----
  135. # Track rate limit and wait it out if needed
  136. if (Sys.time() > rate_limit$reset_at) {
  137. log_debug("Updating out-of-date rate limit")
  138. rate_limit <- rtweet::rate_limits(query = "statuses/lookup")
  139. }
  140. if (rate_limit$remaining - fetch_count < 1) {
  141. # wait until rate limit resets
  142. wait_s <- difftime(Sys.time(), rate_limit$reset_at, units = "sec")
  143. log_info("Waiting for rate limit to reset at {rate_limit$reset_at}")
  144. Sys.sleep(ceiling(as.numeric(wait_s)))
  145. }
  146. if (fetch_count > 0 && fetch_count %% 50 == 0) {
  147. rate_limit <- rtweet::rate_limits(query = "statuses/lookup")
  148. }
  149. # Get Statuses ----
  150. if (n_status_large) {
  151. idx_start <- (idx_group - 1) * 90000 + 1
  152. idx_end <- min(idx_group * 90000, n_status)
  153. log_info("Getting tweets {idx_start} to {idx_end} of {n_status}")
  154. } else {
  155. idx_start <- 1
  156. idx_end <- n_status
  157. log_info("Getting {n_status} tweets")
  158. }
  159. tweets <- bind_rows(
  160. tweets,
  161. rtweet::lookup_statuses(status_id[idx_start:idx_end], ...)
  162. )
  163. }
  164. tweets
  165. }
  166. path_lock <- function(file) {
  167. path(path_add(file, NULL, prepend = "."), ext = "lock")
  168. }
  169. path_add <- function(file, append = strftime(Sys.time(), "_%F_%H%M%S"), prepend = NULL) {
  170. if (is.null(append)) append <- ""
  171. if (is.null(prepend)) prepend <- ""
  172. file_base <- fs::path_ext_remove(fs::path_file(file))
  173. file_ext <- fs::path_ext(file)
  174. file_dir <- fs::path_dir(file)
  175. path(file_dir,
  176. glue::glue("{prepend}{file_base}{append}"),
  177. ext = file_ext)
  178. }
  179. stopifnot_locked <- function(lck = NULL, message = "Unable to aquire lock") {
  180. if (!is.null(lck)) return(invisible(TRUE))
  181. log_error(message, envir = sys.frame(1))
  182. }
  183. shared_lock <- function(file, timeout = 1 * 60 * 1000) {
  184. lock(path_lock(file), exclusive = FALSE, timeout = timeout)
  185. }
  186. exclusive_lock <- function(file, timeout = 1 * 60 * 1000) {
  187. lock(path_lock(file), exclusive = TRUE, timeout = timeout)
  188. }
  189. #' @title Get user info
  190. #' @param file The file where tweets are located. The text `_users` is
  191. #' automatically appended to this file name.
  192. #' @export
  193. get_user_info <- function(
  194. tweets = NULL,
  195. file = getOption("gathertweet.file", "tweets.rds"),
  196. dir_profile_images = NULL
  197. ) {
  198. if (is.null(tweets)) read_tweets(file)
  199. user_file <- path_add(file, append = "_users")
  200. users <- tweets %>%
  201. rtweet::users_data() %>%
  202. dplyr::distinct()
  203. users <- save_tweets(users, user_file, key_var = "user_id")
  204. if (!is.null(dir_profile_images)) {
  205. rs <- lapply(users$profile_image_url, download_profile_images, output_dir = dir_profile_images)
  206. }
  207. return(users)
  208. }
  209. download_profile_images <- function(profile_image_url, ..., output_dir = "data") {
  210. output_file <- sub("^.+?profile", "profile", profile_image_url)
  211. output_file <- fs::path(output_dir, output_file)
  212. fs::dir_create(fs::path_dir(output_file), recursive = TRUE)
  213. download_file(profile_image_url, output_file)
  214. }
  215. download_file <- function(url, dest) {
  216. if (fs::file_exists(dest)) return(dest)
  217. x <- list(result = NULL, error = NULL)
  218. x$result <- tryCatch({
  219. download.file(url, dest)
  220. dest
  221. }, error = function(e) x$error <<- e$message)
  222. if (!is.null(x$error)) {
  223. log_warn("Error downloading {dest}: {x$error}")
  224. } else x$result
  225. }