Refactor thread/index.js w/more parallelism/speed

Beforehand, Pushshift comments were all downloaded first, then they were
processed, then all Reddit comments were downloaded, then processed.
Now, as each batch of Pushshift comments completes, it is processed
asynchronously, and Reddit comments are downloaded & processed as soon
as enough comment IDs have been batched for submission. This minimizes
the amount of time spent in the 'Comparing comments to Reddit API'
phase. It also spaces out Reddit API requests for better netiquette.
This commit is contained in:
Christopher Gurnee 2022-03-03 14:10:20 -05:00
parent d93815ae73
commit c236bf3e5a
5 changed files with 150 additions and 98 deletions

View file

@ -1,4 +1,4 @@
import { fetchJson } from '../../utils'
import { fetchJson, sleep } from '../../utils'
export const chunkSize = 100;
const postURL = 'https://api.pushshift.io/reddit/submission/search/?ids='
@ -12,9 +12,6 @@ const errorHandler = (msg, origError, from) => {
throw error
}
const sleep = ms =>
new Promise(slept => setTimeout(slept, ms))
class TokenBucket {
// Refills tokens at a rate of one per msRefillIntvl millis, storing up to size tokens.
@ -74,7 +71,10 @@ export const getPost = async threadID => {
}
}
export const getComments = async (allComments, threadID, maxComments, after) => {
// The callback() function is called with an Array of comments after each chunk is
// retrieved. It should return as quickly as possible (scheduling time-taking work
// later), and may return false to cause getComments to exit early, or true otherwise.
export const getComments = async (callback, threadID, maxComments, after) => {
let chunks = Math.ceil(maxComments / chunkSize), comments, lastCreatedUtc = 1
while (true) {
@ -92,18 +92,17 @@ export const getComments = async (allComments, threadID, maxComments, after) =>
pushshiftTokenBucket.setNextAvail(delay)
}
}
comments.forEach(c => allComments.set(c.id, {
const exitEarly = callback(comments.map(c => ({
...c,
parent_id: c.parent_id?.substring(3) || threadID,
link_id: c.link_id?.substring(3) || threadID
}))
})))
const loadedAllComments = comments.length < chunkSize/2
if (comments.length)
lastCreatedUtc = comments[comments.length - 1].created_utc
if (comments.length < chunkSize/2)
return [ lastCreatedUtc, true ]
if (chunks <= 1)
return [ lastCreatedUtc, false ]
if (loadedAllComments || chunks <= 1 || exitEarly)
return [ lastCreatedUtc, loadedAllComments ]
chunks--
after = Math.max(lastCreatedUtc - 1, after + 1)
}

View file

@ -1,5 +1,6 @@
import { fetchJson, chunk } from '../../utils'
import { fetchJson } from '../../utils'
export const chunkSize = 100;
const baseURL = 'https://api.reddit.com'
const requestSettings = {headers: {"Accept-Language": "en"}}
@ -35,14 +36,9 @@ export const getPost = (subreddit, threadID) => (
// .catch(error => errorHandler(error, 'reddit.getThreads'))
//)
// Helper function that fetches a list of comments
const fetchComments = (commentIDs) => (
// Fetch multiple comments by id
export const getComments = commentIDs => (
fetchJson(`${baseURL}/api/info?id=${commentIDs.map(id => `t1_${id}`).join()}`, requestSettings)
.then(results => results.data.children.map(({data}) => data))
)
export const getComments = commentIDs => (
Promise.all(chunk(commentIDs, 100).map(ids => fetchComments(ids)))
.then(results => results.flat())
.catch(error => errorHandler(error, 'reddit.getComments'))
)

View file

@ -22,7 +22,7 @@ const unflatten = (commentMap, root, removed, deleted) => {
} else if ((parentComment = commentMap.get(parentID)) !== undefined) {
parentComment.replies.push(comment)
} else {
console.error('MISSING PARENT ID:', parentID, 'for comment', comment)
console.warn('Missing parent ID:', parentID, 'for comment', comment)
}
})

View file

@ -2,13 +2,14 @@ import React from 'react'
import { Link } from 'react-router-dom'
import {
getPost,
getComments as getRedditComments
getComments as getRedditComments,
chunkSize as redditChunkSize
} from '../../api/reddit'
import {
getPost as getRemovedPost,
getComments as getPushshiftComments
} from '../../api/pushshift'
import { isDeleted, isRemoved } from '../../utils'
import { isDeleted, isRemoved, sleep } from '../../utils'
import { connect, constrainMaxComments } from '../../state'
import Post from '../common/Post'
import CommentSection from './CommentSection'
@ -16,6 +17,42 @@ import SortBy from './SortBy'
import CommentInfo from './CommentInfo'
import LoadMore from './LoadMore'
class ChunkedQueue {
constructor(chunkSize) {
if (!(chunkSize > 0))
throw RangeError('chunkSize must be > 0')
this._chunkSize = chunkSize
this._chunks = [[]] // Array of Arrays
// Invariant: this._chunks always contains at least one Array
}
push(x) {
const last = this._chunks[this._chunks.length - 1]
if (last.length < this._chunkSize)
last.push(x)
else
this._chunks.push([x])
}
hasFullChunk() {
return this._chunks[0].length >= this._chunkSize
}
shiftChunk() {
const first = this._chunks.shift()
if (this._chunks.length == 0)
this._chunks.push([])
return first
}
shiftAll() {
const all = this._chunks.flat()
this._chunks = [[]]
return all
}
}
class Thread extends React.Component {
state = {
post: {},
@ -96,80 +133,106 @@ class Thread extends React.Component {
getComments (newCommentCount, after) {
const { threadID } = this.props.match.params
const pushshiftCommentLookup = this.state.pushshiftCommentLookup
const { pushshiftCommentLookup, removed, deleted } = this.state
const redditIdQueue = new ChunkedQueue(redditChunkSize)
const pushshiftPromises = [], redditPromises = []
let redditError = false, doRedditComments
// Get comment ids from pushshift
getPushshiftComments(pushshiftCommentLookup, threadID, newCommentCount, after)
.then(([lastCreatedUtc, loadedAllComments]) => {
console.log(`Pushshift: ${pushshiftCommentLookup.size} comments`)
const ids = []
const missingIds = new Set()
this.lastCreatedUtc = lastCreatedUtc
// Extract ids from pushshift response
pushshiftCommentLookup.forEach(comment => {
ids.push(comment.id)
if (comment.parent_id != threadID &&
!pushshiftCommentLookup.has(comment.parent_id) &&
!missingIds.has(comment.parent_id)) {
ids.push(comment.parent_id)
missingIds.add(comment.parent_id)
// Process a chunk of comments downloaded from Pushshift (started below)
const processPushshiftComments = comments => {
pushshiftPromises.push(sleep(0).then(() => {
let count = 0
comments.forEach(comment => {
const { id, parent_id } = comment
if (!pushshiftCommentLookup.has(id)) {
pushshiftCommentLookup.set(id, comment)
redditIdQueue.push(id)
count++
if (parent_id != threadID && !pushshiftCommentLookup.has(parent_id)) {
pushshiftCommentLookup.set(parent_id, undefined)
redditIdQueue.push(parent_id)
}
}
});
missingIds.clear()
})
while (redditIdQueue.hasFullChunk())
doRedditComments(redditIdQueue.shiftChunk())
return count
}))
return redditError // causes getPushshiftComments() to exit early on a Reddit error
}
// Get all the comments from reddit
// Download a list of comments by id from Reddit, and process them
doRedditComments = ids => redditPromises.push(getRedditComments(ids)
.then(comments => {
comments.forEach(comment => {
let pushshiftComment = pushshiftCommentLookup.get(comment.id)
if (pushshiftComment === undefined) {
// When a parent comment is missing from pushshift, use the reddit comment instead
comment.parent_id = comment.parent_id.substring(3)
comment.link_id = comment.link_id.substring(3)
pushshiftComment = comment
pushshiftCommentLookup.set(comment.id, pushshiftComment)
} else {
// Replace pushshift score with reddit (it's usually more accurate)
pushshiftComment.score = comment.score
}
// Check what is removed / deleted according to reddit
if (isRemoved(comment.body)) {
removed.push(comment.id)
pushshiftComment.removed = true
} else if (isDeleted(comment.body)) {
deleted.push(comment.id)
pushshiftComment.deleted = true
} else if (pushshiftComment !== comment) {
if (isRemoved(pushshiftComment.body)) {
// If it's deleted in pushshift, but later restored by a mod, use the restored
comment.parent_id = comment.parent_id.substring(3)
comment.link_id = comment.link_id.substring(3)
pushshiftCommentLookup.set(comment.id, comment)
} else if (pushshiftComment.body != comment.body) {
pushshiftComment.edited_body = comment.body
pushshiftComment.edited = comment.edited
}
}
})
return comments.length
})
.catch(error => {
this.props.global.setError(error, error.helpUrl)
redditError = true
})
)
// Download comments from Pushshift, and process each chunk (above) as it's retrieved
getPushshiftComments(processPushshiftComments, threadID, newCommentCount, after)
.then(([lastCreatedUtc, loadedAllComments]) => {
this.lastCreatedUtc = lastCreatedUtc
if (redditError)
return
this.props.global.setLoading('Comparing comments to Reddit API...')
return getRedditComments(ids)
.then(redditComments => {
console.log(`Reddit: ${redditComments.length} comments`)
const removed = []
const deleted = []
redditComments.forEach(comment => {
let pushshiftComment = pushshiftCommentLookup.get(comment.id)
if (pushshiftComment === undefined) {
// When a parent comment is missing from pushshift, use the reddit comment instead
comment.parent_id = comment.parent_id.substring(3)
comment.link_id = comment.link_id.substring(3)
pushshiftComment = comment
pushshiftCommentLookup.set(comment.id, pushshiftComment)
} else {
// Replace pushshift score with reddit (it's usually more accurate)
pushshiftComment.score = comment.score
}
// All comments have been retrieved from Pushshift; wait for processing to finish
Promise.all(pushshiftPromises).then(lengths => {
console.log('Pushshift:', lengths.reduce((a,b) => a+b, 0), 'comments')
// Check what is removed / deleted according to reddit
if (isRemoved(comment.body)) {
removed.push(comment.id)
pushshiftComment.removed = true
} else if (isDeleted(comment.body)) {
deleted.push(comment.id)
pushshiftComment.deleted = true
} else if (pushshiftComment !== comment) {
if (isRemoved(pushshiftComment.body)) {
// If it's deleted in pushshift, but later restored by a mod, use the restored
comment.parent_id = comment.parent_id.substring(3)
comment.link_id = comment.link_id.substring(3)
pushshiftCommentLookup.set(comment.id, comment)
} else if (pushshiftComment.body != comment.body) {
pushshiftComment.edited_body = comment.body
pushshiftComment.edited = comment.edited
}
}
})
this.props.global.setSuccess()
this.setState({
pushshiftCommentLookup,
removed,
deleted,
loadedAllComments,
loadingComments: false,
reloadingComments: false
})
// All comments from Pushshift have been processed; wait for Reddit to finish
doRedditComments(redditIdQueue.shiftAll())
Promise.all(redditPromises).then(lengths => {
console.log('Reddit:', lengths.reduce((a,b) => a+b, 0), 'comments')
if (!redditError) {
this.props.global.setSuccess()
this.setState({
pushshiftCommentLookup,
removed,
deleted,
loadedAllComments,
loadingComments: false,
reloadingComments: false
})
}
})
.catch(e => this.props.global.setError(e, e.helpUrl))
})
})
.catch(e => this.props.global.setError(e, e.helpUrl))
}

View file

@ -18,14 +18,8 @@ export const fetchJson = (url, init = {}) =>
})
)
// Take on big array and split it into an array of chunks with correct size
export const chunk = (arr, size) => {
const chunks = []
for (let i = 0; i < arr.length; i += size) {
chunks.push(arr.slice(i, i + size))
}
return chunks
}
export const sleep = ms =>
new Promise(slept => setTimeout(slept, ms))
// Reddits way of indicating that something is deleted (the '\\' is for Reddit and the other is for pushshift)
export const isDeleted = textBody => textBody === '\\[deleted\\]' || textBody === '[deleted]'