mirror of
https://github.com/gurnec/removeddit.git
synced 2026-03-11 08:54:27 +00:00
Refactor thread/index.js w/more parallelism/speed
Beforehand, Pushshift comments were all downloaded first, then they were processed, then all Reddit comments were downloaded, then processed. Now, as each batch of Pushshift comments completes, it is processed asynchronously, and Reddit comments are downloaded & processed as soon as enough comment IDs have been batched for submission. This minimizes the amount of time spent in the 'Comparing comments to Reddit API' phase. It also spaces out Reddit API requests for better netiquette.
This commit is contained in:
parent
d93815ae73
commit
c236bf3e5a
5 changed files with 150 additions and 98 deletions
|
|
@ -1,4 +1,4 @@
|
|||
import { fetchJson } from '../../utils'
|
||||
import { fetchJson, sleep } from '../../utils'
|
||||
|
||||
export const chunkSize = 100;
|
||||
const postURL = 'https://api.pushshift.io/reddit/submission/search/?ids='
|
||||
|
|
@ -12,9 +12,6 @@ const errorHandler = (msg, origError, from) => {
|
|||
throw error
|
||||
}
|
||||
|
||||
const sleep = ms =>
|
||||
new Promise(slept => setTimeout(slept, ms))
|
||||
|
||||
class TokenBucket {
|
||||
|
||||
// Refills tokens at a rate of one per msRefillIntvl millis, storing up to size tokens.
|
||||
|
|
@ -74,7 +71,10 @@ export const getPost = async threadID => {
|
|||
}
|
||||
}
|
||||
|
||||
export const getComments = async (allComments, threadID, maxComments, after) => {
|
||||
// The callback() function is called with an Array of comments after each chunk is
|
||||
// retrieved. It should return as quickly as possible (scheduling time-taking work
|
||||
// later), and may return false to cause getComments to exit early, or true otherwise.
|
||||
export const getComments = async (callback, threadID, maxComments, after) => {
|
||||
let chunks = Math.ceil(maxComments / chunkSize), comments, lastCreatedUtc = 1
|
||||
while (true) {
|
||||
|
||||
|
|
@ -92,18 +92,17 @@ export const getComments = async (allComments, threadID, maxComments, after) =>
|
|||
pushshiftTokenBucket.setNextAvail(delay)
|
||||
}
|
||||
}
|
||||
|
||||
comments.forEach(c => allComments.set(c.id, {
|
||||
const exitEarly = callback(comments.map(c => ({
|
||||
...c,
|
||||
parent_id: c.parent_id?.substring(3) || threadID,
|
||||
link_id: c.link_id?.substring(3) || threadID
|
||||
}))
|
||||
})))
|
||||
|
||||
const loadedAllComments = comments.length < chunkSize/2
|
||||
if (comments.length)
|
||||
lastCreatedUtc = comments[comments.length - 1].created_utc
|
||||
if (comments.length < chunkSize/2)
|
||||
return [ lastCreatedUtc, true ]
|
||||
if (chunks <= 1)
|
||||
return [ lastCreatedUtc, false ]
|
||||
if (loadedAllComments || chunks <= 1 || exitEarly)
|
||||
return [ lastCreatedUtc, loadedAllComments ]
|
||||
chunks--
|
||||
after = Math.max(lastCreatedUtc - 1, after + 1)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
import { fetchJson, chunk } from '../../utils'
|
||||
import { fetchJson } from '../../utils'
|
||||
|
||||
export const chunkSize = 100;
|
||||
const baseURL = 'https://api.reddit.com'
|
||||
const requestSettings = {headers: {"Accept-Language": "en"}}
|
||||
|
||||
|
|
@ -35,14 +36,9 @@ export const getPost = (subreddit, threadID) => (
|
|||
// .catch(error => errorHandler(error, 'reddit.getThreads'))
|
||||
//)
|
||||
|
||||
// Helper function that fetches a list of comments
|
||||
const fetchComments = (commentIDs) => (
|
||||
// Fetch multiple comments by id
|
||||
export const getComments = commentIDs => (
|
||||
fetchJson(`${baseURL}/api/info?id=${commentIDs.map(id => `t1_${id}`).join()}`, requestSettings)
|
||||
.then(results => results.data.children.map(({data}) => data))
|
||||
)
|
||||
|
||||
export const getComments = commentIDs => (
|
||||
Promise.all(chunk(commentIDs, 100).map(ids => fetchComments(ids)))
|
||||
.then(results => results.flat())
|
||||
.catch(error => errorHandler(error, 'reddit.getComments'))
|
||||
)
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ const unflatten = (commentMap, root, removed, deleted) => {
|
|||
} else if ((parentComment = commentMap.get(parentID)) !== undefined) {
|
||||
parentComment.replies.push(comment)
|
||||
} else {
|
||||
console.error('MISSING PARENT ID:', parentID, 'for comment', comment)
|
||||
console.warn('Missing parent ID:', parentID, 'for comment', comment)
|
||||
}
|
||||
})
|
||||
|
||||
|
|
|
|||
|
|
@ -2,13 +2,14 @@ import React from 'react'
|
|||
import { Link } from 'react-router-dom'
|
||||
import {
|
||||
getPost,
|
||||
getComments as getRedditComments
|
||||
getComments as getRedditComments,
|
||||
chunkSize as redditChunkSize
|
||||
} from '../../api/reddit'
|
||||
import {
|
||||
getPost as getRemovedPost,
|
||||
getComments as getPushshiftComments
|
||||
} from '../../api/pushshift'
|
||||
import { isDeleted, isRemoved } from '../../utils'
|
||||
import { isDeleted, isRemoved, sleep } from '../../utils'
|
||||
import { connect, constrainMaxComments } from '../../state'
|
||||
import Post from '../common/Post'
|
||||
import CommentSection from './CommentSection'
|
||||
|
|
@ -16,6 +17,42 @@ import SortBy from './SortBy'
|
|||
import CommentInfo from './CommentInfo'
|
||||
import LoadMore from './LoadMore'
|
||||
|
||||
class ChunkedQueue {
|
||||
|
||||
constructor(chunkSize) {
|
||||
if (!(chunkSize > 0))
|
||||
throw RangeError('chunkSize must be > 0')
|
||||
this._chunkSize = chunkSize
|
||||
this._chunks = [[]] // Array of Arrays
|
||||
// Invariant: this._chunks always contains at least one Array
|
||||
}
|
||||
|
||||
push(x) {
|
||||
const last = this._chunks[this._chunks.length - 1]
|
||||
if (last.length < this._chunkSize)
|
||||
last.push(x)
|
||||
else
|
||||
this._chunks.push([x])
|
||||
}
|
||||
|
||||
hasFullChunk() {
|
||||
return this._chunks[0].length >= this._chunkSize
|
||||
}
|
||||
|
||||
shiftChunk() {
|
||||
const first = this._chunks.shift()
|
||||
if (this._chunks.length == 0)
|
||||
this._chunks.push([])
|
||||
return first
|
||||
}
|
||||
|
||||
shiftAll() {
|
||||
const all = this._chunks.flat()
|
||||
this._chunks = [[]]
|
||||
return all
|
||||
}
|
||||
}
|
||||
|
||||
class Thread extends React.Component {
|
||||
state = {
|
||||
post: {},
|
||||
|
|
@ -96,80 +133,106 @@ class Thread extends React.Component {
|
|||
|
||||
getComments (newCommentCount, after) {
|
||||
const { threadID } = this.props.match.params
|
||||
const pushshiftCommentLookup = this.state.pushshiftCommentLookup
|
||||
const { pushshiftCommentLookup, removed, deleted } = this.state
|
||||
const redditIdQueue = new ChunkedQueue(redditChunkSize)
|
||||
const pushshiftPromises = [], redditPromises = []
|
||||
let redditError = false, doRedditComments
|
||||
|
||||
// Get comment ids from pushshift
|
||||
getPushshiftComments(pushshiftCommentLookup, threadID, newCommentCount, after)
|
||||
.then(([lastCreatedUtc, loadedAllComments]) => {
|
||||
console.log(`Pushshift: ${pushshiftCommentLookup.size} comments`)
|
||||
const ids = []
|
||||
const missingIds = new Set()
|
||||
this.lastCreatedUtc = lastCreatedUtc
|
||||
|
||||
// Extract ids from pushshift response
|
||||
pushshiftCommentLookup.forEach(comment => {
|
||||
ids.push(comment.id)
|
||||
if (comment.parent_id != threadID &&
|
||||
!pushshiftCommentLookup.has(comment.parent_id) &&
|
||||
!missingIds.has(comment.parent_id)) {
|
||||
ids.push(comment.parent_id)
|
||||
missingIds.add(comment.parent_id)
|
||||
// Process a chunk of comments downloaded from Pushshift (started below)
|
||||
const processPushshiftComments = comments => {
|
||||
pushshiftPromises.push(sleep(0).then(() => {
|
||||
let count = 0
|
||||
comments.forEach(comment => {
|
||||
const { id, parent_id } = comment
|
||||
if (!pushshiftCommentLookup.has(id)) {
|
||||
pushshiftCommentLookup.set(id, comment)
|
||||
redditIdQueue.push(id)
|
||||
count++
|
||||
if (parent_id != threadID && !pushshiftCommentLookup.has(parent_id)) {
|
||||
pushshiftCommentLookup.set(parent_id, undefined)
|
||||
redditIdQueue.push(parent_id)
|
||||
}
|
||||
}
|
||||
});
|
||||
missingIds.clear()
|
||||
})
|
||||
while (redditIdQueue.hasFullChunk())
|
||||
doRedditComments(redditIdQueue.shiftChunk())
|
||||
return count
|
||||
}))
|
||||
return redditError // causes getPushshiftComments() to exit early on a Reddit error
|
||||
}
|
||||
|
||||
// Get all the comments from reddit
|
||||
// Download a list of comments by id from Reddit, and process them
|
||||
doRedditComments = ids => redditPromises.push(getRedditComments(ids)
|
||||
.then(comments => {
|
||||
comments.forEach(comment => {
|
||||
let pushshiftComment = pushshiftCommentLookup.get(comment.id)
|
||||
if (pushshiftComment === undefined) {
|
||||
// When a parent comment is missing from pushshift, use the reddit comment instead
|
||||
comment.parent_id = comment.parent_id.substring(3)
|
||||
comment.link_id = comment.link_id.substring(3)
|
||||
pushshiftComment = comment
|
||||
pushshiftCommentLookup.set(comment.id, pushshiftComment)
|
||||
} else {
|
||||
// Replace pushshift score with reddit (it's usually more accurate)
|
||||
pushshiftComment.score = comment.score
|
||||
}
|
||||
|
||||
// Check what is removed / deleted according to reddit
|
||||
if (isRemoved(comment.body)) {
|
||||
removed.push(comment.id)
|
||||
pushshiftComment.removed = true
|
||||
} else if (isDeleted(comment.body)) {
|
||||
deleted.push(comment.id)
|
||||
pushshiftComment.deleted = true
|
||||
} else if (pushshiftComment !== comment) {
|
||||
if (isRemoved(pushshiftComment.body)) {
|
||||
// If it's deleted in pushshift, but later restored by a mod, use the restored
|
||||
comment.parent_id = comment.parent_id.substring(3)
|
||||
comment.link_id = comment.link_id.substring(3)
|
||||
pushshiftCommentLookup.set(comment.id, comment)
|
||||
} else if (pushshiftComment.body != comment.body) {
|
||||
pushshiftComment.edited_body = comment.body
|
||||
pushshiftComment.edited = comment.edited
|
||||
}
|
||||
}
|
||||
})
|
||||
return comments.length
|
||||
})
|
||||
.catch(error => {
|
||||
this.props.global.setError(error, error.helpUrl)
|
||||
redditError = true
|
||||
})
|
||||
)
|
||||
|
||||
// Download comments from Pushshift, and process each chunk (above) as it's retrieved
|
||||
getPushshiftComments(processPushshiftComments, threadID, newCommentCount, after)
|
||||
.then(([lastCreatedUtc, loadedAllComments]) => {
|
||||
this.lastCreatedUtc = lastCreatedUtc
|
||||
if (redditError)
|
||||
return
|
||||
this.props.global.setLoading('Comparing comments to Reddit API...')
|
||||
return getRedditComments(ids)
|
||||
.then(redditComments => {
|
||||
console.log(`Reddit: ${redditComments.length} comments`)
|
||||
const removed = []
|
||||
const deleted = []
|
||||
|
||||
redditComments.forEach(comment => {
|
||||
let pushshiftComment = pushshiftCommentLookup.get(comment.id)
|
||||
if (pushshiftComment === undefined) {
|
||||
// When a parent comment is missing from pushshift, use the reddit comment instead
|
||||
comment.parent_id = comment.parent_id.substring(3)
|
||||
comment.link_id = comment.link_id.substring(3)
|
||||
pushshiftComment = comment
|
||||
pushshiftCommentLookup.set(comment.id, pushshiftComment)
|
||||
} else {
|
||||
// Replace pushshift score with reddit (it's usually more accurate)
|
||||
pushshiftComment.score = comment.score
|
||||
}
|
||||
// All comments have been retrieved from Pushshift; wait for processing to finish
|
||||
Promise.all(pushshiftPromises).then(lengths => {
|
||||
console.log('Pushshift:', lengths.reduce((a,b) => a+b, 0), 'comments')
|
||||
|
||||
// Check what is removed / deleted according to reddit
|
||||
if (isRemoved(comment.body)) {
|
||||
removed.push(comment.id)
|
||||
pushshiftComment.removed = true
|
||||
} else if (isDeleted(comment.body)) {
|
||||
deleted.push(comment.id)
|
||||
pushshiftComment.deleted = true
|
||||
} else if (pushshiftComment !== comment) {
|
||||
if (isRemoved(pushshiftComment.body)) {
|
||||
// If it's deleted in pushshift, but later restored by a mod, use the restored
|
||||
comment.parent_id = comment.parent_id.substring(3)
|
||||
comment.link_id = comment.link_id.substring(3)
|
||||
pushshiftCommentLookup.set(comment.id, comment)
|
||||
} else if (pushshiftComment.body != comment.body) {
|
||||
pushshiftComment.edited_body = comment.body
|
||||
pushshiftComment.edited = comment.edited
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
this.props.global.setSuccess()
|
||||
this.setState({
|
||||
pushshiftCommentLookup,
|
||||
removed,
|
||||
deleted,
|
||||
loadedAllComments,
|
||||
loadingComments: false,
|
||||
reloadingComments: false
|
||||
})
|
||||
// All comments from Pushshift have been processed; wait for Reddit to finish
|
||||
doRedditComments(redditIdQueue.shiftAll())
|
||||
Promise.all(redditPromises).then(lengths => {
|
||||
console.log('Reddit:', lengths.reduce((a,b) => a+b, 0), 'comments')
|
||||
if (!redditError) {
|
||||
this.props.global.setSuccess()
|
||||
this.setState({
|
||||
pushshiftCommentLookup,
|
||||
removed,
|
||||
deleted,
|
||||
loadedAllComments,
|
||||
loadingComments: false,
|
||||
reloadingComments: false
|
||||
})
|
||||
}
|
||||
})
|
||||
.catch(e => this.props.global.setError(e, e.helpUrl))
|
||||
})
|
||||
})
|
||||
.catch(e => this.props.global.setError(e, e.helpUrl))
|
||||
}
|
||||
|
|
|
|||
10
src/utils.js
10
src/utils.js
|
|
@ -18,14 +18,8 @@ export const fetchJson = (url, init = {}) =>
|
|||
})
|
||||
)
|
||||
|
||||
// Take on big array and split it into an array of chunks with correct size
|
||||
export const chunk = (arr, size) => {
|
||||
const chunks = []
|
||||
for (let i = 0; i < arr.length; i += size) {
|
||||
chunks.push(arr.slice(i, i + size))
|
||||
}
|
||||
return chunks
|
||||
}
|
||||
export const sleep = ms =>
|
||||
new Promise(slept => setTimeout(slept, ms))
|
||||
|
||||
// Reddits way of indicating that something is deleted (the '\\' is for Reddit and the other is for pushshift)
|
||||
export const isDeleted = textBody => textBody === '\\[deleted\\]' || textBody === '[deleted]'
|
||||
|
|
|
|||
Loading…
Reference in a new issue