Track pending token requests to limit concurrency
This commit is contained in:
parent
f9c9b0d3a4
commit
34964f9e56
3 changed files with 32 additions and 19 deletions
|
@ -72,15 +72,12 @@ proc fetch*(url: Uri; api: Api): Future[JsonNode] {.async.} =
|
|||
remaining = parseInt(resp.headers[rlRemaining])
|
||||
reset = parseInt(resp.headers[rlReset])
|
||||
token.setRateLimit(api, remaining, reset)
|
||||
echo api, " ", remaining, " ", url.path
|
||||
else:
|
||||
echo api, " ", url.path
|
||||
|
||||
if result.getError notin {invalidToken, forbidden, badToken}:
|
||||
token.lastUse = getTime()
|
||||
release(token, used=true)
|
||||
else:
|
||||
echo "fetch error: ", result.getError
|
||||
release(token, true)
|
||||
release(token, invalid=true)
|
||||
raise rateLimitError()
|
||||
|
||||
if resp.status == $Http400:
|
||||
|
@ -90,5 +87,5 @@ proc fetch*(url: Uri; api: Api): Future[JsonNode] {.async.} =
|
|||
except Exception as e:
|
||||
echo "error: ", e.name, ", msg: ", e.msg, ", token: ", token[], ", url: ", url
|
||||
if "length" notin e.msg and "descriptor" notin e.msg:
|
||||
release(token, true)
|
||||
release(token, invalid=true)
|
||||
raise rateLimitError()
|
||||
|
|
|
@ -5,6 +5,7 @@ import zippy
|
|||
import types, agents, consts, http_pool
|
||||
|
||||
const
|
||||
maxConcurrentReqs = 5 # max requests at a time per token, to avoid race conditions
|
||||
maxAge = 3.hours # tokens expire after 3 hours
|
||||
maxLastUse = 1.hours # if a token is unused for 60 minutes, it expires
|
||||
failDelay = initDuration(minutes=30)
|
||||
|
@ -19,6 +20,7 @@ proc getPoolJson*: string =
|
|||
for token in tokenPool:
|
||||
list[token.tok] = %*{
|
||||
"apis": newJObject(),
|
||||
"pending": token.pending,
|
||||
"init": $token.init,
|
||||
"lastUse": $token.lastUse
|
||||
}
|
||||
|
@ -69,35 +71,48 @@ proc isLimited(token: Token; api: Api): bool =
|
|||
|
||||
if api in token.apis:
|
||||
let limit = token.apis[api]
|
||||
return (limit.remaining <= 5 and limit.reset > getTime())
|
||||
return (limit.remaining <= 10 and limit.reset > getTime())
|
||||
else:
|
||||
return false
|
||||
|
||||
proc release*(token: Token; invalid=false) =
|
||||
if not token.isNil and (invalid or token.expired):
|
||||
proc isReady(token: Token; api: Api): bool =
|
||||
not (token.isNil or token.pending > maxConcurrentReqs or token.isLimited(api))
|
||||
|
||||
proc release*(token: Token; used=false; invalid=false) =
|
||||
if token.isNil: return
|
||||
if invalid or token.expired:
|
||||
let idx = tokenPool.find(token)
|
||||
if idx > -1: tokenPool.delete(idx)
|
||||
elif used:
|
||||
dec token.pending
|
||||
token.lastUse = getTime()
|
||||
|
||||
proc getToken*(api: Api): Future[Token] {.async.} =
|
||||
for i in 0 ..< tokenPool.len:
|
||||
if not (result.isNil or result.isLimited(api)):
|
||||
break
|
||||
if result.isReady(api): break
|
||||
release(result)
|
||||
result = tokenPool.sample()
|
||||
|
||||
if result.isNil or result.isLimited(api):
|
||||
if not result.isReady(api):
|
||||
release(result)
|
||||
result = await fetchToken()
|
||||
tokenPool.add result
|
||||
|
||||
if result.isNil:
|
||||
if not result.isNil:
|
||||
inc result.pending
|
||||
else:
|
||||
raise rateLimitError()
|
||||
|
||||
proc setRateLimit*(token: Token; api: Api; remaining, reset: int) =
|
||||
token.apis[api] = RateLimit(
|
||||
remaining: remaining,
|
||||
reset: fromUnix(reset)
|
||||
)
|
||||
let reset = fromUnix(reset)
|
||||
|
||||
# avoid undefined behavior in race conditions
|
||||
if api in token.apis:
|
||||
let limit = token.apis[api]
|
||||
if limit.reset >= reset and limit.remaining < remaining:
|
||||
return
|
||||
|
||||
token.apis[api] = RateLimit(remaining: remaining, reset: reset)
|
||||
|
||||
proc poolTokens*(amount: int) {.async.} =
|
||||
var futs: seq[Future[Token]]
|
||||
|
|
|
@ -26,6 +26,7 @@ type
|
|||
tok*: string
|
||||
init*: Time
|
||||
lastUse*: Time
|
||||
pending*: int
|
||||
apis*: Table[Api, RateLimit]
|
||||
|
||||
Error* = enum
|
||||
|
|
Loading…
Reference in a new issue