Use a token bucket to rate-limit the Pushshift API

Queries to Pushshift will now be started about as quickly as
permitted, while also reducing 429 responses to a minimum.
This commit is contained in:
Christopher Gurnee 2022-02-18 22:23:34 -05:00
parent c98f71bd4d
commit 6d1600ff01

View file

@ -7,7 +7,58 @@ const commentURL = `https://api.pushshift.io/reddit/comment/search/?size=${chunk
const sleep = ms =>
new Promise(slept => setTimeout(slept, ms))
class TokenBucket {
// Refills tokens at a rate of one per msRefillIntvl millis, storing up to size tokens.
constructor(msRefillIntvl, size) {
if (!(msRefillIntvl > 0))
throw RangeError('msRefillIntvl must be > 0')
if (!(size > 0))
throw RangeError('size must be > 0')
this._msRefillIntvl = msRefillIntvl
this._maxSize = size
this._tokens = size
// Invariant: this._msNextRefill is valid iff this._tokens < this._maxSize
}
// Removes one token, waiting for it to refill if none are available.
async waitForToken() {
let msNow
// Calculate if/how many tokens to refill
if (this._tokens < this._maxSize) { // this._msNextRefill is valid
msNow = Date.now()
if (msNow >= this._msNextRefill) {
const newTokens = Math.floor((msNow - this._msNextRefill) / this._msRefillIntvl) + 1
this._tokens += newTokens
if (this._tokens < this._maxSize)
this._msNextRefill += newTokens * this._msRefillIntvl
else
this._tokens = this._maxSize // this._msNextRefill is now invalid
}
}
// Remove a token or wait for _msNextRefill, and recalculate it
if (this._tokens > 0) {
if (this._tokens == this._maxSize) // this._msNextRefill is invalid,
this._msNextRefill = (msNow || Date.now()) + this._msRefillIntvl // make it valid
this._tokens--
} else { // this._msNextRefill is valid and msNow has already been set above
await sleep(this._msNextRefill - msNow)
this._msNextRefill += this._msRefillIntvl
}
}
// Removes all tokens, and will refill the next token msNextAvail
// millis from now. After it's refilled, resumes normal refill rate.
setNextAvail(msNextAvail) {
this.tokens = 0
this._msNextRefill = Date.now() + msNextAvail
}
}
const pushshiftTokenBucket = new TokenBucket(1015, 7)
export const getPost = async threadID => {
await pushshiftTokenBucket.waitForToken()
try {
return (await fetchJson(`${postURL}${threadID}`)).data[0]
} catch (error) {
@ -18,22 +69,25 @@ export const getPost = async threadID => {
export const getComments = async (threadID, maxComments) => {
let chunks = Math.ceil(maxComments / chunkSize)
let after = 0, delay = 0, comments
let after = 0, comments
const allComments = new Map()
while (true) {
let delay = 0
while (true) {
await pushshiftTokenBucket.waitForToken()
try {
comments = (await fetchJson(`${commentURL}${threadID}&after=${after}`)).data
break
} catch (error) {
if (delay > 4000) {
if (delay >= 8000) { // after ~16s of consecutive failures
console.error('pushshift.getComments: ' + error)
throw new Error('Could not get removed comments')
}
delay = delay * 2 || 500
delay = delay * 2 || 125
console.log('pushshift.getComments delay: ' + delay)
pushshiftTokenBucket.setNextAvail(delay)
}
await sleep(delay)
}
comments.forEach(c => allComments.set(c.id, {
@ -45,8 +99,6 @@ export const getComments = async (threadID, maxComments) => {
break
chunks -= 1
after = Math.max(comments[comments.length - 1].created_utc - 1, after + 1)
if (delay)
await sleep(delay)
}
return allComments
}