EmoteManager/utils/emote_client.py

153 lines
4.6 KiB
Python

# © lambda#0987
# SPDX-License-Identifier: AGPL-3.0-or-later
import sys
import json
import asyncio
import aiohttp
import platform
import datetime
import urllib.parse
from typing import Dict
from http import HTTPStatus
import utils.image as image_utils
from utils.errors import RateLimitedError
from discord import HTTPException, Forbidden, NotFound, DiscordServerError
class GuildRetryTimes:
"""Holds the times, for a particular guild,
that we have to wait until for the rate limit for a particular HTTP method to elapse.
"""
__slots__ = frozenset({'POST', 'DELETE'})
def __init__(self, POST=None, DELETE=None):
self.POST = POST
self.DELETE = DELETE
def validate(self):
now = datetime.datetime.now(tz=datetime.timezone.utc).timestamp()
if self.POST and self.POST < now:
self.POST = None
if self.DELETE and self.DELETE < now:
self.DELETE = None
return self.POST or self.DELETE
GuildId = int
async def json_or_text(resp):
text = await resp.text(encoding='utf-8')
try:
if resp.headers['content-type'] == 'application/json':
return json.loads(text)
except KeyError:
# Thanks Cloudflare
pass
return text
class EmoteClient:
BASE_URL = 'https://discord.com/api/v7'
HTTP_ERROR_CLASSES = {
HTTPStatus.FORBIDDEN: Forbidden,
HTTPStatus.NOT_FOUND: NotFound,
HTTPStatus.SERVICE_UNAVAILABLE: DiscordServerError,
}
def __init__(self, bot):
self.guild_rls: Dict[GuildId, GuildRetryTimes] = {}
self.http = aiohttp.ClientSession(headers={
'User-Agent': bot.config['user_agent'] + ' ' + bot.http.user_agent,
'Authorization': 'Bot ' + bot.config['tokens']['discord'],
'X-Ratelimit-Precision': 'millisecond',
})
# internals 🤫
self._connection_state = bot._connection
async def request(self, method, path, guild_id, **kwargs):
self._check_rl(method, guild_id)
headers = {}
# Emote Manager shouldn't use walrus op until Debian adopts 3.8 :(
reason = kwargs.pop('reason', None)
if reason:
headers['X-Audit-Log-Reason'] = urllib.parse.quote(reason, safe='/ ')
kwargs['headers'] = headers
# TODO handle OSError and 500/502, like dpy does
async with self.http.request(method, self.BASE_URL + path, **kwargs) as resp:
if resp.status == HTTPStatus.TOO_MANY_REQUESTS:
return await self._handle_rl(resp, method, path, guild_id, **kwargs)
data = await json_or_text(resp)
if resp.status in range(200, 300):
return data
error_cls = self.HTTP_ERROR_CLASSES.get(resp.status, HTTPException)
raise error_cls(resp, data)
def _check_rl(self, method, guild_id):
try:
rls = self.guild_rls[guild_id]
except KeyError:
return
if not rls.validate():
del self.guild_rls[guild_id]
return
retry_at = getattr(rls, method, None)
if retry_at:
raise RateLimitedError(retry_at)
async def _handle_rl(self, resp, method, path, guild_id, **kwargs):
retry_after = (await resp.json())['retry_after'] / 1000.0
retry_at = datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(seconds=retry_after)
# cache unconditionally in case request() is called again while we're sleeping
try:
rls = self.guild_rls[guild_id]
except KeyError:
self.guild_rls[guild_id] = rls = GuildRetryTimes()
setattr(rls, resp.method, retry_at.timestamp())
if resp.method not in GuildRetryTimes.__slots__ or retry_after < 10.0:
await asyncio.sleep(retry_after)
# woo mutual recursion
return await self.request(method, path, guild_id, **kwargs)
# we've been hit with one of those crazy high rate limits, which only occur for specific methods
raise RateLimitedError(retry_at)
# optimization methods that let us check the RLs before downloading the user's image
def check_create(self, guild_id):
self._check_rl('POST', guild_id)
def check_delete(self, guild_id):
self._check_rl('DELETE', guild_id)
async def create(self, *, guild, name, image: bytes, role_ids=(), reason=None):
data = await self.request(
'POST', f'/guilds/{guild.id}/emojis',
guild.id,
json=dict(name=name, image=image_utils.image_to_base64_url(image), roles=role_ids),
reason=reason,
)
# internals 🤫
# this is A) so we can return a bona-fide, authentic, Emoji object,
# and B) because it's what dpy does to keep the emoji cache up to date for s
return self._connection_state.store_emoji(guild, data)
async def delete(self, *, guild_id, emote_id, reason=None):
return await self.request('DELETE', f'/guilds/{guild_id}/emojis/{emote_id}', guild_id, reason=reason)
async def __aenter__(self):
self.http = await self.http.__aenter__()
return self
async def __aexit__(self, *excinfo):
return await self.http.__aexit__(*excinfo)
async def close(self):
return await self.http.close()