# © lambda#0987 # SPDX-License-Identifier: AGPL-3.0-or-later import sys import json import asyncio import aiohttp import platform import datetime import urllib.parse from typing import Dict from http import HTTPStatus from discord import PartialEmoji import utils.image as image_utils from utils.errors import RateLimitedError from discord import HTTPException, Forbidden, NotFound, DiscordServerError class GuildRetryTimes: """Holds the times, for a particular guild, that we have to wait until for the rate limit for a particular HTTP method to elapse. """ __slots__ = frozenset({'POST', 'DELETE'}) def __init__(self, POST=None, DELETE=None): self.POST = POST self.DELETE = DELETE def validate(self): now = datetime.datetime.now(tz=datetime.timezone.utc).timestamp() if self.POST and self.POST < now: self.POST = None if self.DELETE and self.DELETE < now: self.DELETE = None return self.POST or self.DELETE GuildId = int async def json_or_text(resp): text = await resp.text(encoding='utf-8') try: if resp.headers['content-type'] == 'application/json': return json.loads(text) except KeyError: # Thanks Cloudflare pass return text class EmoteClient: BASE_URL = 'https://discord.com/api/v7' HTTP_ERROR_CLASSES = { HTTPStatus.FORBIDDEN: Forbidden, HTTPStatus.NOT_FOUND: NotFound, HTTPStatus.SERVICE_UNAVAILABLE: DiscordServerError, } def __init__(self, bot): self.guild_rls: Dict[GuildId, GuildRetryTimes] = {} self.http = aiohttp.ClientSession(headers={ 'User-Agent': bot.config['user_agent'] + ' ' + bot.http.user_agent, 'Authorization': 'Bot ' + bot.config['tokens']['discord'], 'X-Ratelimit-Precision': 'millisecond', }) async def request(self, method, path, guild_id, **kwargs): self._check_rl(method, guild_id) headers = {} # Emote Manager shouldn't use walrus op until Debian adopts 3.8 :( reason = kwargs.pop('reason', None) if reason: headers['X-Audit-Log-Reason'] = urllib.parse.quote(reason, safe='/ ') kwargs['headers'] = headers # TODO handle OSError and 500/502, like dpy does async with self.http.request(method, self.BASE_URL + path, **kwargs) as resp: if resp.status == HTTPStatus.TOO_MANY_REQUESTS: return await self._handle_rl(resp, method, path, guild_id, **kwargs) data = await json_or_text(resp) if resp.status in range(200, 300): return data error_cls = self.HTTP_ERROR_CLASSES.get(resp.status, HTTPException) raise error_cls(resp, data) def _check_rl(self, method, guild_id): try: rls = self.guild_rls[guild_id] except KeyError: return if not rls.validate(): del self.guild_rls[guild_id] return retry_at = getattr(rls, method, None) if retry_at: raise RateLimitedError(retry_at) async def _handle_rl(self, resp, method, path, guild_id, **kwargs): retry_after = (await resp.json())['retry_after'] / 1000.0 retry_at = datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(seconds=retry_after) # cache unconditionally in case request() is called again while we're sleeping try: rls = self.guild_rls[guild_id] except KeyError: self.guild_rls[guild_id] = rls = GuildRetryTimes() setattr(rls, resp.method, retry_at.timestamp()) if resp.method not in GuildRetryTimes.__slots__ or retry_after < 10.0: await asyncio.sleep(retry_after) # woo mutual recursion return await self.request(method, path, guild_id, **kwargs) # we've been hit with one of those crazy high rate limits, which only occur for specific methods raise RateLimitedError(retry_at) # optimization methods that let us check the RLs before downloading the user's image def check_create(self, guild_id): self._check_rl('POST', guild_id) def check_delete(self, guild_id): self._check_rl('DELETE', guild_id) async def create(self, *, guild, name, image: bytes, role_ids=(), reason=None): data = await self.request( 'POST', f'/guilds/{guild.id}/emojis', guild.id, json=dict(name=name, image=image_utils.image_to_base64_url(image), roles=role_ids), reason=reason, ) return PartialEmoji(animated=data.get('animated', False), name=data.get('name'), id=data.get('id')) async def delete(self, *, guild_id, emote_id, reason=None): return await self.request('DELETE', f'/guilds/{guild_id}/emojis/{emote_id}', guild_id, reason=reason) async def __aenter__(self): self.http = await self.http.__aenter__() return self async def __aexit__(self, *excinfo): return await self.http.__aexit__(*excinfo) async def close(self): return await self.http.close()