1
0
Fork 0
mirror of https://github.com/uhIgnacio/EmoteManager.git synced 2024-08-15 02:23:13 +00:00
EmoteManager/utils/emote_client.py
2021-06-23 14:39:02 +00:00

148 lines
4.5 KiB
Python

# © lambda#0987
# SPDX-License-Identifier: AGPL-3.0-or-later
import sys
import json
import asyncio
import aiohttp
import platform
import datetime
import urllib.parse
from typing import Dict
from http import HTTPStatus
from discord import PartialEmoji
import utils.image as image_utils
from utils.errors import RateLimitedError
from discord import HTTPException, Forbidden, NotFound, DiscordServerError
class GuildRetryTimes:
"""Holds the times, for a particular guild,
that we have to wait until for the rate limit for a particular HTTP method to elapse.
"""
__slots__ = frozenset({'POST', 'DELETE'})
def __init__(self, POST=None, DELETE=None):
self.POST = POST
self.DELETE = DELETE
def validate(self):
now = datetime.datetime.now(tz=datetime.timezone.utc).timestamp()
if self.POST and self.POST < now:
self.POST = None
if self.DELETE and self.DELETE < now:
self.DELETE = None
return self.POST or self.DELETE
GuildId = int
async def json_or_text(resp):
text = await resp.text(encoding='utf-8')
try:
if resp.headers['content-type'] == 'application/json':
return json.loads(text)
except KeyError:
# Thanks Cloudflare
pass
return text
class EmoteClient:
BASE_URL = 'https://discord.com/api/v7'
HTTP_ERROR_CLASSES = {
HTTPStatus.FORBIDDEN: Forbidden,
HTTPStatus.NOT_FOUND: NotFound,
HTTPStatus.SERVICE_UNAVAILABLE: DiscordServerError,
}
def __init__(self, bot):
self.guild_rls: Dict[GuildId, GuildRetryTimes] = {}
self.http = aiohttp.ClientSession(headers={
'User-Agent': bot.config['user_agent'] + ' ' + bot.http.user_agent,
'Authorization': 'Bot ' + bot.config['tokens']['discord'],
'X-Ratelimit-Precision': 'millisecond',
})
async def request(self, method, path, guild_id, **kwargs):
self._check_rl(method, guild_id)
headers = {}
# Emote Manager shouldn't use walrus op until Debian adopts 3.8 :(
reason = kwargs.pop('reason', None)
if reason:
headers['X-Audit-Log-Reason'] = urllib.parse.quote(reason, safe='/ ')
kwargs['headers'] = headers
# TODO handle OSError and 500/502, like dpy does
async with self.http.request(method, self.BASE_URL + path, **kwargs) as resp:
if resp.status == HTTPStatus.TOO_MANY_REQUESTS:
return await self._handle_rl(resp, method, path, guild_id, **kwargs)
data = await json_or_text(resp)
if resp.status in range(200, 300):
return data
error_cls = self.HTTP_ERROR_CLASSES.get(resp.status, HTTPException)
raise error_cls(resp, data)
def _check_rl(self, method, guild_id):
try:
rls = self.guild_rls[guild_id]
except KeyError:
return
if not rls.validate():
del self.guild_rls[guild_id]
return
retry_at = getattr(rls, method, None)
if retry_at:
raise RateLimitedError(retry_at)
async def _handle_rl(self, resp, method, path, guild_id, **kwargs):
retry_after = (await resp.json())['retry_after'] / 1000.0
retry_at = datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(seconds=retry_after)
# cache unconditionally in case request() is called again while we're sleeping
try:
rls = self.guild_rls[guild_id]
except KeyError:
self.guild_rls[guild_id] = rls = GuildRetryTimes()
setattr(rls, resp.method, retry_at.timestamp())
if resp.method not in GuildRetryTimes.__slots__ or retry_after < 10.0:
await asyncio.sleep(retry_after)
# woo mutual recursion
return await self.request(method, path, guild_id, **kwargs)
# we've been hit with one of those crazy high rate limits, which only occur for specific methods
raise RateLimitedError(retry_at)
# optimization methods that let us check the RLs before downloading the user's image
def check_create(self, guild_id):
self._check_rl('POST', guild_id)
def check_delete(self, guild_id):
self._check_rl('DELETE', guild_id)
async def create(self, *, guild, name, image: bytes, role_ids=(), reason=None):
data = await self.request(
'POST', f'/guilds/{guild.id}/emojis',
guild.id,
json=dict(name=name, image=image_utils.image_to_base64_url(image), roles=role_ids),
reason=reason,
)
return PartialEmoji(animated=data.get('animated', False), name=data.get('name'), id=data.get('id'))
async def delete(self, *, guild_id, emote_id, reason=None):
return await self.request('DELETE', f'/guilds/{guild_id}/emojis/{emote_id}', guild_id, reason=reason)
async def __aenter__(self):
self.http = await self.http.__aenter__()
return self
async def __aexit__(self, *excinfo):
return await self.http.__aexit__(*excinfo)
async def close(self):
return await self.http.close()