mirror of
https://github.com/gurnec/removeddit.git
synced 2026-03-11 08:54:27 +00:00
Use a token bucket to rate-limit the Pushshift API
Queries to Pushshift will now be started about as quickly as permitted, while also reducing 429 responses to a minimum.
This commit is contained in:
parent
c98f71bd4d
commit
6d1600ff01
1 changed files with 58 additions and 6 deletions
|
|
@ -7,7 +7,58 @@ const commentURL = `https://api.pushshift.io/reddit/comment/search/?size=${chunk
|
|||
const sleep = ms =>
|
||||
new Promise(slept => setTimeout(slept, ms))
|
||||
|
||||
class TokenBucket {
|
||||
|
||||
// Refills tokens at a rate of one per msRefillIntvl millis, storing up to size tokens.
|
||||
constructor(msRefillIntvl, size) {
|
||||
if (!(msRefillIntvl > 0))
|
||||
throw RangeError('msRefillIntvl must be > 0')
|
||||
if (!(size > 0))
|
||||
throw RangeError('size must be > 0')
|
||||
this._msRefillIntvl = msRefillIntvl
|
||||
this._maxSize = size
|
||||
this._tokens = size
|
||||
// Invariant: this._msNextRefill is valid iff this._tokens < this._maxSize
|
||||
}
|
||||
|
||||
// Removes one token, waiting for it to refill if none are available.
|
||||
async waitForToken() {
|
||||
let msNow
|
||||
// Calculate if/how many tokens to refill
|
||||
if (this._tokens < this._maxSize) { // this._msNextRefill is valid
|
||||
msNow = Date.now()
|
||||
if (msNow >= this._msNextRefill) {
|
||||
const newTokens = Math.floor((msNow - this._msNextRefill) / this._msRefillIntvl) + 1
|
||||
this._tokens += newTokens
|
||||
if (this._tokens < this._maxSize)
|
||||
this._msNextRefill += newTokens * this._msRefillIntvl
|
||||
else
|
||||
this._tokens = this._maxSize // this._msNextRefill is now invalid
|
||||
}
|
||||
}
|
||||
// Remove a token or wait for _msNextRefill, and recalculate it
|
||||
if (this._tokens > 0) {
|
||||
if (this._tokens == this._maxSize) // this._msNextRefill is invalid,
|
||||
this._msNextRefill = (msNow || Date.now()) + this._msRefillIntvl // make it valid
|
||||
this._tokens--
|
||||
} else { // this._msNextRefill is valid and msNow has already been set above
|
||||
await sleep(this._msNextRefill - msNow)
|
||||
this._msNextRefill += this._msRefillIntvl
|
||||
}
|
||||
}
|
||||
|
||||
// Removes all tokens, and will refill the next token msNextAvail
|
||||
// millis from now. After it's refilled, resumes normal refill rate.
|
||||
setNextAvail(msNextAvail) {
|
||||
this.tokens = 0
|
||||
this._msNextRefill = Date.now() + msNextAvail
|
||||
}
|
||||
}
|
||||
|
||||
const pushshiftTokenBucket = new TokenBucket(1015, 7)
|
||||
|
||||
export const getPost = async threadID => {
|
||||
await pushshiftTokenBucket.waitForToken()
|
||||
try {
|
||||
return (await fetchJson(`${postURL}${threadID}`)).data[0]
|
||||
} catch (error) {
|
||||
|
|
@ -18,22 +69,25 @@ export const getPost = async threadID => {
|
|||
|
||||
export const getComments = async (threadID, maxComments) => {
|
||||
let chunks = Math.ceil(maxComments / chunkSize)
|
||||
let after = 0, delay = 0, comments
|
||||
let after = 0, comments
|
||||
const allComments = new Map()
|
||||
while (true) {
|
||||
|
||||
let delay = 0
|
||||
while (true) {
|
||||
await pushshiftTokenBucket.waitForToken()
|
||||
try {
|
||||
comments = (await fetchJson(`${commentURL}${threadID}&after=${after}`)).data
|
||||
break
|
||||
} catch (error) {
|
||||
if (delay > 4000) {
|
||||
if (delay >= 8000) { // after ~16s of consecutive failures
|
||||
console.error('pushshift.getComments: ' + error)
|
||||
throw new Error('Could not get removed comments')
|
||||
}
|
||||
delay = delay * 2 || 500
|
||||
delay = delay * 2 || 125
|
||||
console.log('pushshift.getComments delay: ' + delay)
|
||||
pushshiftTokenBucket.setNextAvail(delay)
|
||||
}
|
||||
await sleep(delay)
|
||||
}
|
||||
|
||||
comments.forEach(c => allComments.set(c.id, {
|
||||
|
|
@ -45,8 +99,6 @@ export const getComments = async (threadID, maxComments) => {
|
|||
break
|
||||
chunks -= 1
|
||||
after = Math.max(comments[comments.length - 1].created_utc - 1, after + 1)
|
||||
if (delay)
|
||||
await sleep(delay)
|
||||
}
|
||||
return allComments
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue