diff --git a/bot.py b/bot.py index f5be795..756e82d 100755 --- a/bot.py +++ b/bot.py @@ -3,6 +3,8 @@ # © lambda#0987 # SPDX-License-Identifier: AGPL-3.0-or-later +import sys +import asyncio import base64 import logging import traceback @@ -17,74 +19,78 @@ logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) # SelectorEventLoop on windows doesn't support subprocesses lol -import asyncio -import sys if sys.platform == 'win32': - loop = asyncio.ProactorEventLoop() - asyncio.set_event_loop(loop) + loop = asyncio.ProactorEventLoop() + asyncio.set_event_loop(loop) + class Bot(Bot): - startup_extensions = ( - 'cogs.emote', - 'cogs.meta', - 'bot_bin.debug', - 'bot_bin.misc', - 'bot_bin.systemd', - 'jishaku', - ) + startup_extensions = ( + 'cogs.emote', + 'cogs.meta', + 'bot_bin.debug', + 'bot_bin.misc', + 'bot_bin.systemd', + 'jishaku', + ) - def __init__(self, **kwargs): - with open('data/config.py', encoding='utf-8') as f: - config = eval(f.read(), {}) + def __init__(self, **kwargs): + with open('data/config.py', encoding='utf-8') as f: + config = eval(f.read(), {}) - super().__init__(config=config, **kwargs) - # allow use of the bot's user ID before ready() - token_part0 = self.config['tokens']['discord'].partition('.')[0].encode() - self.user_id = int(base64.b64decode(token_part0 + b'=' * (3 - len(token_part0) % 3))) + super().__init__(config=config, **kwargs) + # allow use of the bot's user ID before ready() + token_part0 = self.config['tokens']['discord'].partition('.')[ + 0].encode() + self.user_id = int(base64.b64decode( + token_part0 + b'=' * (3 - len(token_part0) % 3))) + + def process_config(self): + """Load the emojis from the config to be used when a command fails or succeeds + We do it this way so that they can be used anywhere instead of requiring a bot instance. + """ + super().process_config() + import utils.misc + default = ('❌', '✅') + utils.SUCCESS_EMOJIS = utils.misc.SUCCESS_EMOJIS = ( + self.config.get('response_emojis', {}).get('success', default)) - def process_config(self): - """Load the emojis from the config to be used when a command fails or succeeds - We do it this way so that they can be used anywhere instead of requiring a bot instance. - """ - super().process_config() - import utils.misc - default = ('❌', '✅') - utils.SUCCESS_EMOJIS = utils.misc.SUCCESS_EMOJIS = ( - self.config.get('response_emojis', {}).get('success', default)) def main(): - import sys + import sys - if len(sys.argv) == 1: - shard_count = None - shard_ids = None - elif len(sys.argv) < 3: - print('Usage:', sys.argv[0], '[ ]', file=sys.stderr) - sys.exit(1) - else: - shard_count = int(sys.argv[1]) - shard_ids = list(map(int, sys.argv[2].split('-'))) + if len(sys.argv) == 1: + shard_count = None + shard_ids = None + elif len(sys.argv) < 3: + print( + 'Usage:', sys.argv[0], '[ ]', file=sys.stderr) + sys.exit(1) + else: + shard_count = int(sys.argv[1]) + shard_ids = list(map(int, sys.argv[2].split('-'))) - Bot( - intents=discord.Intents( - guilds=True, - # we hardly need DM support but it's helpful to be able to run the help/support commands in DMs - messages=True, - # we don't need DM reactions because we don't ever paginate in DMs - guild_reactions=True, - emojis=True, - # everything else, including `members` and `presences`, is implicitly false. - ), + Bot( + intents=nextcord.Intents( + guilds=True, + # we hardly need DM support but it's helpful to be able to run the help/support commands in DMs + messages=True, + # we don't need DM reactions because we don't ever paginate in DMs + guild_reactions=True, + emojis=True, + # everything else, including `members` and `presences`, is implicitly false. + ), - # the least stateful bot you will ever see 😎 - chunk_guilds_at_startup=False, - member_cache_flags=discord.MemberCacheFlags.none(), - # disable message cache - max_messages=None, + # the least stateful bot you will ever see 😎 + chunk_guilds_at_startup=False, + member_cache_flags=nextcord.MemberCacheFlags.none(), + # disable message cache + max_messages=None, + + shard_count=shard_count, + shard_ids=shard_ids, + ).run() - shard_count=shard_count, - shard_ids=shard_ids, - ).run() if __name__ == '__main__': - main() + main() diff --git a/cogs/emote.py b/cogs/emote.py index 54a225f..d31e174 100644 --- a/cogs/emote.py +++ b/cogs/emote.py @@ -18,9 +18,9 @@ import warnings import weakref import aiohttp -import discord +import nextcord import humanize -from discord.ext import commands +from nextcord.ext import commands import utils import utils.image @@ -32,554 +32,578 @@ from utils.converter import emote_type_filter_default logger = logging.getLogger(__name__) # guilds can have duplicate emotes, so let us create zips to match -warnings.filterwarnings('ignore', module='zipfile', category=UserWarning, message=r"^Duplicate name: .*$") +warnings.filterwarnings('ignore', module='zipfile', + category=UserWarning, message=r"^Duplicate name: .*$") + class UserCancelledError(commands.UserInputError): - pass + pass + class Emotes(commands.Cog): - IMAGE_MIMETYPES = {'image/png', 'image/jpeg', 'image/gif', 'image/webp'} - # TAR_MIMETYPES = {'application/x-tar', 'application/x-xz', 'application/gzip', 'application/x-bzip2'} - TAR_MIMETYPES = {'application/x-tar'} - ZIP_MIMETYPES = {'application/zip', 'application/octet-stream', 'application/x-zip-compressed', 'multipart/x-zip'} - ARCHIVE_MIMETYPES = TAR_MIMETYPES | ZIP_MIMETYPES - ZIP_OVERHEAD_BYTES = 30 - - def __init__(self, bot): - self.bot = bot - - connector = None - socks5_url = self.bot.config.get('socks5_proxy_url') - if socks5_url: - from aiohttp_socks import SocksConnector - connector = SocksConnector.from_url(socks5_url, rdns=True) - - self.http = aiohttp.ClientSession( - loop=self.bot.loop, - read_timeout=self.bot.config.get('http_read_timeout', 60), - connector=connector if self.bot.config.get('use_socks5_for_all_connections') else None, - headers={ - 'User-Agent': - self.bot.config['user_agent'] + ' ' - + self.bot.http.user_agent - }) - - self.emote_client = EmoteClient(self.bot) - - with open('data/ec-emotes-final.json') as f: - self.ec_emotes = json.load(f) - - # keep track of paginators so we can end them when the cog is unloaded - self.paginators = weakref.WeakSet() - - def cog_unload(self): - async def close(): - await self.http.close() - await self.emote_client.close() - - for paginator in self.paginators: - await paginator.stop() - - self.bot.loop.create_task(close()) - - public_commands = set() - def public(command, public_commands=public_commands): # resolve some kinda scope issue that i don't understand - public_commands.add(command.qualified_name) - return command - - async def cog_check(self, context): - if not context.guild: - raise commands.NoPrivateMessage - - # we can't just do `context.command in self.public_commands` here - # because apparently Command.__eq__ is not defined - if context.command.qualified_name in self.public_commands: - return True - - if ( - not context.author.guild_permissions.manage_emojis - or not context.guild.me.guild_permissions.manage_emojis - ): - raise errors.MissingManageEmojisPermission - - return True - - @commands.Cog.listener() - async def on_command_error(self, context, error): - if isinstance(error, errors.EmoteManagerError): - await context.send(error) - - if isinstance(error, commands.NoPrivateMessage): - await context.send( - f'{utils.SUCCESS_EMOJIS[False]} Sorry, this command may only be used in a server.') - - @commands.command(usage='[name] ') - async def add(self, context, *args): - """Add a new emote to this server. - - You can use it like this: - `add :thonkang:` (if you already have that emote) - `add rollsafe https://image.noelshack.com/fichiers/2017/06/1486495269-rollsafe.png` - `add speedtest ` - - With a file attachment: - `add name` will upload a new emote using the first attachment as the image and call it `name` - `add` will upload a new emote using the first attachment as the image, - and its filename as the name - """ - name, url = self.parse_add_command_args(context, args) - async with context.typing(): - message = await self.add_safe(context, name, url) - await context.send(message) - - @commands.command(name='add-these') - async def add_these(self, context, *emotes): - """Add a bunch of custom emotes.""" - - ran = False - # we could use *emotes: discord.PartialEmoji here but that would require spaces between each emote. - # and would fail if any arguments were not valid emotes - for match in re.finditer(utils.emote.RE_CUSTOM_EMOTE, ''.join(emotes)): - ran = True - animated, name, id = match.groups() - image_url = utils.emote.url(id, animated=animated) - async with context.typing(): - message = await self.add_safe(context, name, image_url) - await context.send(message) - - if not ran: - return await context.send('Error: no custom emotes were provided.') - - await context.message.add_reaction(utils.SUCCESS_EMOJIS[True]) - - @classmethod - def parse_add_command_args(cls, context, args): - if context.message.attachments: - return cls.parse_add_command_attachment(context, args) - - elif len(args) == 1: - match = utils.emote.RE_CUSTOM_EMOTE.match(args[0]) - if match is None: - raise commands.BadArgument( - 'Error: I expected a custom emote as the first argument, ' - 'but I got something else. ' - "If you're trying to add an emote using an image URL, " - 'you need to provide a name as the first argument, like this:\n' - '`{}add NAME_HERE URL_HERE`'.format(context.prefix)) - else: - animated, name, id = match.groups() - url = utils.emote.url(id, animated=animated) - - return name, url - - elif len(args) >= 2: - name = args[0] - match = utils.emote.RE_CUSTOM_EMOTE.match(args[1]) - if match is None: - url = utils.strip_angle_brackets(args[1]) - else: - url = utils.emote.url(match['id'], animated=match['animated']) - - return name, url - - elif not args: - raise commands.BadArgument('Your message had no emotes and no name!') - - @classmethod - def parse_add_command_attachment(cls, context, args): - attachment = context.message.attachments[0] - name = cls.format_emote_filename(''.join(args) if args else attachment.filename) - url = attachment.url - - return name, url - - @staticmethod - def format_emote_filename(filename): - """format a filename to an emote name as discord does when you upload an emote image""" - left, sep, right = posixpath.splitext(filename)[0].rpartition('-') - return (left or right).replace(' ', '') - - @commands.command(name='add-from-ec', aliases=['addfromec']) - async def add_from_ec(self, context, name, *names): - """Copies one or more emotes from Emote Collector to your server.""" - if names: - for name in (name,) + names: - await context.invoke(self.add_from_ec, name) - await context.message.add_reaction(utils.SUCCESS_EMOJIS[True]) - return - - try: - emote = self.ec_emotes[name.strip(':').lower()] - except KeyError: - return await context.send("Emote not found in Emote Collector's database.") - - reason = ( - f'Added from Emote Collector by {utils.format_user(context.author)}. ' - f'Original emote author ID: {emote["author"]}') - - image_url = utils.emote.url(emote['id'], animated=emote['animated']) - async with context.typing(): - message = await self.add_safe(context, name, image_url, reason=reason) - - await context.send(message) - - @public - @emote_type_filter_default - @commands.command() - @commands.bot_has_permissions(attach_files=True) - async def export(self, context, image_type='all'): - """Export all emotes from this server to a zip file, suitable for use with the import command. - - If “animated” is provided, only include animated emotes. - If “static” is provided, only include static emotes. - Otherwise, or if “all” is provided, export all emotes. - - This command requires the “attach files” permission. - """ - emotes = list(filter(image_type, context.guild.emojis)) - if not emotes: - raise commands.BadArgument('No emotes of that type were found in this server.') - - async with context.typing(): - async for zip_file in self.archive_emotes(context, emotes): - await context.send(file=zip_file) - - async def archive_emotes(self, context, emotes): - filesize_limit = context.guild.filesize_limit - discrims = collections.defaultdict(int) - downloaded = collections.deque() - async def download(emote): - # don't put two files in the zip with the same name - discrims[emote.name] += 1 - discrim = discrims[emote.name] - if discrim == 1: - name = emote.name - else: - name = f'{emote.name}-{discrim}' - - name = f'{name}.{"gif" if emote.animated else "png"}' - - # place some level of trust on discord's CDN to actually give us images - data = await self.fetch_safe(str(emote.url), validate_headers=False) - if type(data) is str: # error case - await context.send(f'{emote}: {data}') - return - - est_zip_overhead = len(name) + self.ZIP_OVERHEAD_BYTES - est_size_in_zip = est_zip_overhead + len(data) - if est_size_in_zip >= filesize_limit: - self.bot.loop.create_task( - context.send(f'{emote} could not be added because it alone would exceed the file size limit.') - ) - return - - downloaded.append((name, emote.created_at, est_size_in_zip, data)) - - await utils.gather_or_cancel(*map(download, emotes)) - - count = 1 - while True: - out = io.BytesIO() - with zipfile.ZipFile(out, 'w', compression=zipfile.ZIP_STORED) as zip: - while True: - try: - item = downloaded.popleft() - except IndexError: - break - - name, created_at, est_size, image_data = item - - if out.tell() + est_size >= filesize_limit: - # adding this emote would bring us over the file size limit - downloaded.appendleft(item) - break - - zinfo = zipfile.ZipInfo(name, date_time=created_at.timetuple()[:6]) - zip.writestr(zinfo, image_data) - - if out.tell() == 0: - # no emotes were written - break - - out.seek(0) - yield discord.File(out, f'emotes-{context.guild.id}-{count}.zip') - count += 1 - - @commands.command(name='import', aliases=['add-zip', 'add-tar', 'add-from-zip', 'add-from-tar']) - async def import_(self, context, url=None): - """Add several emotes from a .zip or .tar archive. - - You may either pass a URL to an archive or upload one as an attachment. - All valid GIF, PNG, and JPEG files in the archive will be uploaded as emotes. - The rest will be ignored. - """ - if url and context.message.attachments: - raise commands.BadArgument('Either a URL or an attachment must be given, not both.') - if not url and not context.message.attachments: - raise commands.BadArgument('A URL or attachment must be given.') - - self.emote_client.check_rl(context.guild.id) - - url = url or context.message.attachments[0].url - async with context.typing(): - archive = await self.fetch_safe(url, valid_mimetypes=self.ARCHIVE_MIMETYPES) - if type(archive) is str: # error case - await context.send(archive) - return - - await self.add_from_archive(context, archive) - with contextlib.suppress(discord.HTTPException): - # so they know when we're done - await context.message.add_reaction(utils.SUCCESS_EMOJIS[True]) - - async def add_from_archive(self, context, archive): - limit = 50_000_000 # prevent someone from trying to make a giant compressed file - async for name, img, error in utils.archive.extract_async(io.BytesIO(archive), size_limit=limit): - try: - utils.image.mime_type_for_image(img) - except errors.InvalidImageError: - continue - if error is None: - name = self.format_emote_filename(posixpath.basename(name)) - async with context.typing(): - message = await self.add_safe_bytes(context, name, img) - await context.send(message) - continue - - if isinstance(error, errors.FileTooBigError): - await context.send( - f'{name}: file too big. ' - f'The limit is {humanize.naturalsize(error.limit)} ' - f'but this file is {humanize.naturalsize(error.size)}.') - continue - - await context.send(f'{name}: {error}') - - async def add_safe(self, context, name, url, *, reason=None): - """Try to add an emote. Returns a string that should be sent to the user.""" - self.emote_client.check_rl(context.guild.id) - try: - image_data = await self.fetch_safe(url) - except errors.InvalidFileError: - raise errors.InvalidImageError - - if type(image_data) is str: # error case (shitty i know) - return image_data - return await self.add_safe_bytes(context, name, image_data, reason=reason) - - async def fetch_safe(self, url, valid_mimetypes=None, *, validate_headers=False): - """Try to fetch a URL. On error return a string that should be sent to the user.""" - try: - return await self.fetch(url, valid_mimetypes=valid_mimetypes, validate_headers=validate_headers) - except asyncio.TimeoutError: - return 'Error: retrieving the image took too long.' - except ValueError: - return 'Error: Invalid URL.' - except aiohttp.ClientResponseError as exc: - raise errors.HTTPException(exc.status) - - async def add_safe_bytes(self, context, name, image_data: bytes, *, reason=None): - """Try to add an emote from bytes. On error, return a string that should be sent to the user. - - If the image is static and there are not enough free static slots, convert the image to a gif instead. - """ - counts = collections.Counter(map(operator.attrgetter('animated'), context.guild.emojis)) - # >= rather than == because there are sneaky ways to exceed the limit - if counts[False] >= context.guild.emoji_limit and counts[True] >= context.guild.emoji_limit: - # we raise instead of returning a string in order to abort commands that run this function in a loop - raise commands.UserInputError('This server is out of emote slots.') - - static = utils.image.mime_type_for_image(image_data) != 'image/gif' - converted = False - if static and counts[False] >= context.guild.emoji_limit: - image_data = await utils.image.convert_to_gif_in_subprocess(image_data) - converted = True - - try: - emote = await self.create_emote_from_bytes(context, name, image_data, reason=reason) - except discord.InvalidArgument: - return discord.utils.escape_mentions(f'{name}: The file supplied was not a valid GIF, PNG, JPEG, or WEBP file.') - except discord.HTTPException as ex: - return discord.utils.escape_mentions( - f'{name}: An error occurred while creating the the emote:\n' - + utils.format_http_exception(ex)) - s = f'Emote {emote} successfully created' - return s + ' as a GIF.' if converted else s + '.' - - async def fetch(self, url, valid_mimetypes=IMAGE_MIMETYPES, *, validate_headers=True): - valid_mimetypes = valid_mimetypes or self.IMAGE_MIMETYPES - def validate_headers(response): - response.raise_for_status() - # some dumb servers also send '; charset=UTF-8' which we should ignore - mimetype, options = cgi.parse_header(response.headers.get('Content-Type', '')) - if mimetype not in valid_mimetypes: - raise errors.InvalidFileError - - async def validate(request): - try: - async with request as response: - validate_headers(response) - return await response.read() - except aiohttp.ClientResponseError: - raise - except aiohttp.ClientError as exc: - raise errors.EmoteManagerError(f'An error occurred while retrieving the file: {exc}') - - if validate_headers: await validate(self.http.head(url, timeout=self.bot.config.get('http_head_timeout', 10))) - return await validate(self.http.get(url)) - - async def create_emote_from_bytes(self, context, name, image_data: bytes, *, reason=None): - if len(image_data) > 256 * 1024: - image_data = await utils.image.resize_in_subprocess(image_data) - if reason is None: - reason = 'Created by ' + utils.format_user(context.author) - return await self.emote_client.create(guild=context.guild, name=name, image=image_data, reason=reason) - - @commands.command(aliases=('delete', 'delet', 'rm')) - async def remove(self, context, emote, *emotes): - """Remove an emote from this server. - - emotes: the name of an emote or of one or more emotes you'd like to remove. - """ - if not emotes: - emote = await self.parse_emote(context, emote) - await emote.delete(reason='Removed by ' + utils.format_user(context.author)) - await context.send(fr'Emote \:{emote.name}: successfully removed.') - else: - for emote in (emote,) + emotes: - await context.invoke(self.remove, emote) - with contextlib.suppress(discord.HTTPException): - await context.message.add_reaction(utils.SUCCESS_EMOJIS[True]) - - @commands.command(aliases=('mv',)) - async def rename(self, context, old, new_name): - """Rename an emote on this server. - - old: the name of the emote to rename, or the emote itself - new_name: what you'd like to rename it to - """ - emote = await self.parse_emote(context, old) - try: - await emote.edit( - name=new_name, - reason=f'Renamed by {utils.format_user(context.author)}') - except discord.HTTPException as ex: - return await context.send( - 'An error occurred while renaming the emote:\n' - + utils.format_http_exception(ex)) - - await context.send(fr'Emote successfully renamed to \:{new_name}:') - - @public - @emote_type_filter_default - @commands.command(aliases=('ls', 'dir')) - async def list(self, context, image_type='all'): - """A list of all emotes on this server. - - The list shows each emote and its raw form. - - If "animated" is provided, only show animated emotes. - If "static" is provided, only show static emotes. - If “all” is provided, show all emotes. - """ - emotes = sorted( - filter(image_type, context.guild.emojis), - key=lambda e: e.name.lower()) - - processed = [] - for emote in emotes: - raw = str(emote).replace(':', r'\:') - processed.append(f'{emote} {raw}') - - paginator = ListPaginator(context, processed) - self.paginators.add(paginator) - await paginator.begin() - - @public - @commands.command(aliases=['status']) - async def stats(self, context): - """The current number of animated and static emotes relative to the limits.""" - emote_limit = context.guild.emoji_limit - - static_emotes = animated_emotes = total_emotes = 0 - for emote in context.guild.emojis: - if emote.animated: - animated_emotes += 1 - else: - static_emotes += 1 - - total_emotes += 1 - - percent_static = round((static_emotes / emote_limit) * 100, 2) - percent_animated = round((animated_emotes / emote_limit) * 100, 2) - - static_left = emote_limit - static_emotes - animated_left = emote_limit - animated_emotes - - await context.send( - f'Static emotes: **{static_emotes} / {emote_limit}** ({static_left} left, {percent_static}% full)\n' - f'Animated emotes: **{animated_emotes} / {emote_limit}** ({animated_left} left, {percent_animated}% full)\n' - f'Total: **{total_emotes} / {emote_limit * 2}**') - - @commands.command(aliases=["embiggen"]) - async def big(self, context, emote): - """Shows the original image for the given emote. - - emote: the emote to embiggen. - """ - emote = await self.parse_emote(context, emote, local=False) - await context.send(f'{emote.name}: {emote.url}') - - async def parse_emote(self, context, name_or_emote, *, local=True): - # this function is mostly synchronous, - # so we yield in order to let the emoji cache update between repeated calls - await asyncio.sleep(0) - - match = utils.emote.RE_CUSTOM_EMOTE.match(name_or_emote) - if match: - id = int(match['id']) - if local: - emote = discord.utils.get(context.guild.emojis, id=id) - if emote: - return emote - else: - return discord.PartialEmoji( - animated=bool(match['animated']), - name=match['name'], - id=int(match['id']), - ) - name = name_or_emote - return await self.disambiguate(context, name) - - async def disambiguate(self, context, name): - name = name.strip(':') # in case the user tries :foo: and foo is animated - candidates = [e for e in context.guild.emojis if e.name.lower() == name.lower() and e.require_colons] - if not candidates: - raise errors.EmoteNotFoundError(name) - - if len(candidates) == 1: - return candidates[0] - - message = ['Multiple emotes were found with that name. Which one do you mean?'] - for i, emote in enumerate(candidates, 1): - message.append(fr'{i}. {emote} (\:{emote.name}:)') - - await context.send('\n'.join(message)) - - def check(message): - try: - int(message.content) - except ValueError: - return False - else: - return message.author == context.author - - try: - message = await self.bot.wait_for('message', check=check, timeout=30) - except asyncio.TimeoutError: - raise commands.UserInputError('Sorry, you took too long. Try again.') - - return candidates[int(message.content)-1] + IMAGE_MIMETYPES = {'image/png', 'image/jpeg', 'image/gif', 'image/webp'} + # TAR_MIMETYPES = {'application/x-tar', 'application/x-xz', 'application/gzip', 'application/x-bzip2'} + TAR_MIMETYPES = {'application/x-tar'} + ZIP_MIMETYPES = {'application/zip', 'application/octet-stream', + 'application/x-zip-compressed', 'multipart/x-zip'} + ARCHIVE_MIMETYPES = TAR_MIMETYPES | ZIP_MIMETYPES + ZIP_OVERHEAD_BYTES = 30 + + def __init__(self, bot): + self.bot = bot + + connector = None + socks5_url = self.bot.config.get('socks5_proxy_url') + if socks5_url: + from aiohttp_socks import SocksConnector + connector = SocksConnector.from_url(socks5_url, rdns=True) + + self.http = aiohttp.ClientSession( + loop=self.bot.loop, + read_timeout=self.bot.config.get('http_read_timeout', 60), + connector=connector if self.bot.config.get( + 'use_socks5_for_all_connections') else None, + headers={ + 'User-Agent': + self.bot.config['user_agent'] + ' ' + + self.bot.http.user_agent + }) + + self.emote_client = EmoteClient(self.bot) + + with open('data/ec-emotes-final.json') as f: + self.ec_emotes = json.load(f) + + # keep track of paginators so we can end them when the cog is unloaded + self.paginators = weakref.WeakSet() + + def cog_unload(self): + async def close(): + await self.http.close() + await self.emote_client.close() + + for paginator in self.paginators: + await paginator.stop() + + self.bot.loop.create_task(close()) + + public_commands = set() + + # resolve some kinda scope issue that i don't understand + def public(command, public_commands=public_commands): + public_commands.add(command.qualified_name) + return command + + async def cog_check(self, context): + if not context.guild: + raise commands.NoPrivateMessage + + # we can't just do `context.command in self.public_commands` here + # because apparently Command.__eq__ is not defined + if context.command.qualified_name in self.public_commands: + return True + + if ( + not context.author.guild_permissions.manage_emojis + or not context.guild.me.guild_permissions.manage_emojis + ): + raise errors.MissingManageEmojisPermission + + return True + + @commands.Cog.listener() + async def on_command_error(self, context, error): + if isinstance(error, errors.EmoteManagerError): + await context.send(error) + + if isinstance(error, commands.NoPrivateMessage): + await context.send( + f'{utils.SUCCESS_EMOJIS[False]} Sorry, this command may only be used in a server.') + + @commands.command(usage='[name] ') + async def add(self, context, *args): + """Add a new emote to this server. + + You can use it like this: + `add :thonkang:` (if you already have that emote) + `add rollsafe https://image.noelshack.com/fichiers/2017/06/1486495269-rollsafe.png` + `add speedtest ` + + With a file attachment: + `add name` will upload a new emote using the first attachment as the image and call it `name` + `add` will upload a new emote using the first attachment as the image, + and its filename as the name + """ + name, url = self.parse_add_command_args(context, args) + async with context.typing(): + message = await self.add_safe(context, name, url) + await context.send(message) + + @commands.command(name='add-these') + async def add_these(self, context, *emotes): + """Add a bunch of custom emotes.""" + + ran = False + # we could use *emotes: nextcord.PartialEmoji here but that would require spaces between each emote. + # and would fail if any arguments were not valid emotes + for match in re.finditer(utils.emote.RE_CUSTOM_EMOTE, ''.join(emotes)): + ran = True + animated, name, id = match.groups() + image_url = utils.emote.url(id, animated=animated) + async with context.typing(): + message = await self.add_safe(context, name, image_url) + await context.send(message) + + if not ran: + return await context.send('Error: no custom emotes were provided.') + + await context.message.add_reaction(utils.SUCCESS_EMOJIS[True]) + + @classmethod + def parse_add_command_args(cls, context, args): + if context.message.attachments: + return cls.parse_add_command_attachment(context, args) + + elif len(args) == 1: + match = utils.emote.RE_CUSTOM_EMOTE.match(args[0]) + if match is None: + raise commands.BadArgument( + 'Error: I expected a custom emote as the first argument, ' + 'but I got something else. ' + "If you're trying to add an emote using an image URL, " + 'you need to provide a name as the first argument, like this:\n' + '`{}add NAME_HERE URL_HERE`'.format(context.prefix)) + else: + animated, name, id = match.groups() + url = utils.emote.url(id, animated=animated) + + return name, url + + elif len(args) >= 2: + name = args[0] + match = utils.emote.RE_CUSTOM_EMOTE.match(args[1]) + if match is None: + url = utils.strip_angle_brackets(args[1]) + else: + url = utils.emote.url(match['id'], animated=match['animated']) + + return name, url + + elif not args: + raise commands.BadArgument( + 'Your message had no emotes and no name!') + + @classmethod + def parse_add_command_attachment(cls, context, args): + attachment = context.message.attachments[0] + name = cls.format_emote_filename( + ''.join(args) if args else attachment.filename) + url = attachment.url + + return name, url + + @staticmethod + def format_emote_filename(filename): + """format a filename to an emote name as nextcord does when you upload an emote image""" + left, sep, right = posixpath.splitext(filename)[0].rpartition('-') + return (left or right).replace(' ', '') + + @commands.command(name='add-from-ec', aliases=['addfromec']) + async def add_from_ec(self, context, name, *names): + """Copies one or more emotes from Emote Collector to your server.""" + if names: + for name in (name,) + names: + await context.invoke(self.add_from_ec, name) + await context.message.add_reaction(utils.SUCCESS_EMOJIS[True]) + return + + try: + emote = self.ec_emotes[name.strip(':').lower()] + except KeyError: + return await context.send("Emote not found in Emote Collector's database.") + + reason = ( + f'Added from Emote Collector by {utils.format_user(context.author)}. ' + f'Original emote author ID: {emote["author"]}') + + image_url = utils.emote.url(emote['id'], animated=emote['animated']) + async with context.typing(): + message = await self.add_safe(context, name, image_url, reason=reason) + + await context.send(message) + + @public + @emote_type_filter_default + @commands.command() + @commands.bot_has_permissions(attach_files=True) + async def export(self, context, image_type='all'): + """Export all emotes from this server to a zip file, suitable for use with the import command. + + If “animated” is provided, only include animated emotes. + If “static” is provided, only include static emotes. + Otherwise, or if “all” is provided, export all emotes. + + This command requires the “attach files” permission. + """ + emotes = list(filter(image_type, context.guild.emojis)) + if not emotes: + raise commands.BadArgument( + 'No emotes of that type were found in this server.') + + async with context.typing(): + async for zip_file in self.archive_emotes(context, emotes): + await context.send(file=zip_file) + + async def archive_emotes(self, context, emotes): + filesize_limit = context.guild.filesize_limit + discrims = collections.defaultdict(int) + downloaded = collections.deque() + + async def download(emote): + # don't put two files in the zip with the same name + discrims[emote.name] += 1 + discrim = discrims[emote.name] + if discrim == 1: + name = emote.name + else: + name = f'{emote.name}-{discrim}' + + name = f'{name}.{"gif" if emote.animated else "png"}' + + # place some level of trust on nextcord's CDN to actually give us images + data = await self.fetch_safe(str(emote.url), validate_headers=False) + if type(data) is str: # error case + await context.send(f'{emote}: {data}') + return + + est_zip_overhead = len(name) + self.ZIP_OVERHEAD_BYTES + est_size_in_zip = est_zip_overhead + len(data) + if est_size_in_zip >= filesize_limit: + self.bot.loop.create_task( + context.send( + f'{emote} could not be added because it alone would exceed the file size limit.') + ) + return + + downloaded.append((name, emote.created_at, est_size_in_zip, data)) + + await utils.gather_or_cancel(*map(download, emotes)) + + count = 1 + while True: + out = io.BytesIO() + with zipfile.ZipFile(out, 'w', compression=zipfile.ZIP_STORED) as zip: + while True: + try: + item = downloaded.popleft() + except IndexError: + break + + name, created_at, est_size, image_data = item + + if out.tell() + est_size >= filesize_limit: + # adding this emote would bring us over the file size limit + downloaded.appendleft(item) + break + + zinfo = zipfile.ZipInfo( + name, date_time=created_at.timetuple()[:6]) + zip.writestr(zinfo, image_data) + + if out.tell() == 0: + # no emotes were written + break + + out.seek(0) + yield nextcord.File(out, f'emotes-{context.guild.id}-{count}.zip') + count += 1 + + @commands.command(name='import', aliases=['add-zip', 'add-tar', 'add-from-zip', 'add-from-tar']) + async def import_(self, context, url=None): + """Add several emotes from a .zip or .tar archive. + + You may either pass a URL to an archive or upload one as an attachment. + All valid GIF, PNG, and JPEG files in the archive will be uploaded as emotes. + The rest will be ignored. + """ + if url and context.message.attachments: + raise commands.BadArgument( + 'Either a URL or an attachment must be given, not both.') + if not url and not context.message.attachments: + raise commands.BadArgument('A URL or attachment must be given.') + + self.emote_client.check_rl(context.guild.id) + + url = url or context.message.attachments[0].url + async with context.typing(): + archive = await self.fetch_safe(url, valid_mimetypes=self.ARCHIVE_MIMETYPES) + if type(archive) is str: # error case + await context.send(archive) + return + + await self.add_from_archive(context, archive) + with contextlib.suppress(nextcord.HTTPException): + # so they know when we're done + await context.message.add_reaction(utils.SUCCESS_EMOJIS[True]) + + async def add_from_archive(self, context, archive): + limit = 50_000_000 # prevent someone from trying to make a giant compressed file + async for name, img, error in utils.archive.extract_async(io.BytesIO(archive), size_limit=limit): + try: + utils.image.mime_type_for_image(img) + except errors.InvalidImageError: + continue + if error is None: + name = self.format_emote_filename(posixpath.basename(name)) + async with context.typing(): + message = await self.add_safe_bytes(context, name, img) + await context.send(message) + continue + + if isinstance(error, errors.FileTooBigError): + await context.send( + f'{name}: file too big. ' + f'The limit is {humanize.naturalsize(error.limit)} ' + f'but this file is {humanize.naturalsize(error.size)}.') + continue + + await context.send(f'{name}: {error}') + + async def add_safe(self, context, name, url, *, reason=None): + """Try to add an emote. Returns a string that should be sent to the user.""" + self.emote_client.check_rl(context.guild.id) + try: + image_data = await self.fetch_safe(url) + except errors.InvalidFileError: + raise errors.InvalidImageError + + if type(image_data) is str: # error case (shitty i know) + return image_data + return await self.add_safe_bytes(context, name, image_data, reason=reason) + + async def fetch_safe(self, url, valid_mimetypes=None, *, validate_headers=False): + """Try to fetch a URL. On error return a string that should be sent to the user.""" + try: + return await self.fetch(url, valid_mimetypes=valid_mimetypes, validate_headers=validate_headers) + except asyncio.TimeoutError: + return 'Error: retrieving the image took too long.' + except ValueError: + return 'Error: Invalid URL.' + except aiohttp.ClientResponseError as exc: + raise errors.HTTPException(exc.status) + + async def add_safe_bytes(self, context, name, image_data: bytes, *, reason=None): + """Try to add an emote from bytes. On error, return a string that should be sent to the user. + + If the image is static and there are not enough free static slots, convert the image to a gif instead. + """ + counts = collections.Counter( + map(operator.attrgetter('animated'), context.guild.emojis)) + # >= rather than == because there are sneaky ways to exceed the limit + if counts[False] >= context.guild.emoji_limit and counts[True] >= context.guild.emoji_limit: + # we raise instead of returning a string in order to abort commands that run this function in a loop + raise commands.UserInputError('This server is out of emote slots.') + + static = utils.image.mime_type_for_image(image_data) != 'image/gif' + converted = False + if static and counts[False] >= context.guild.emoji_limit: + image_data = await utils.image.convert_to_gif_in_subprocess(image_data) + converted = True + + try: + emote = await self.create_emote_from_bytes(context, name, image_data, reason=reason) + except nextcord.InvalidArgument: + return nextcord.utils.escape_mentions(f'{name}: The file supplied was not a valid GIF, PNG, JPEG, or WEBP file.') + except nextcord.HTTPException as ex: + return nextcord.utils.escape_mentions( + f'{name}: An error occurred while creating the the emote:\n' + + utils.format_http_exception(ex)) + s = f'Emote {emote} successfully created' + return s + ' as a GIF.' if converted else s + '.' + + async def fetch(self, url, valid_mimetypes=IMAGE_MIMETYPES, *, validate_headers=True): + valid_mimetypes = valid_mimetypes or self.IMAGE_MIMETYPES + + def validate_headers(response): + response.raise_for_status() + # some dumb servers also send '; charset=UTF-8' which we should ignore + mimetype, options = cgi.parse_header( + response.headers.get('Content-Type', '')) + if mimetype not in valid_mimetypes: + raise errors.InvalidFileError + + async def validate(request): + try: + async with request as response: + validate_headers(response) + return await response.read() + except aiohttp.ClientResponseError: + raise + except aiohttp.ClientError as exc: + raise errors.EmoteManagerError( + f'An error occurred while retrieving the file: {exc}') + + if validate_headers: + await validate(self.http.head(url, timeout=self.bot.config.get('http_head_timeout', 10))) + return await validate(self.http.get(url)) + + async def create_emote_from_bytes(self, context, name, image_data: bytes, *, reason=None): + if len(image_data) > 256 * 1024: + image_data = await utils.image.resize_in_subprocess(image_data) + if reason is None: + reason = 'Created by ' + utils.format_user(context.author) + return await self.emote_client.create(guild=context.guild, name=name, image=image_data, reason=reason) + + @commands.command(aliases=('delete', 'delet', 'rm')) + async def remove(self, context, emote, *emotes): + """Remove an emote from this server. + + emotes: the name of an emote or of one or more emotes you'd like to remove. + """ + if not emotes: + emote = await self.parse_emote(context, emote) + await emote.delete(reason='Removed by ' + utils.format_user(context.author)) + await context.send(fr'Emote \:{emote.name}: successfully removed.') + else: + for emote in (emote,) + emotes: + await context.invoke(self.remove, emote) + with contextlib.suppress(nextcord.HTTPException): + await context.message.add_reaction(utils.SUCCESS_EMOJIS[True]) + + @commands.command(aliases=('mv',)) + async def rename(self, context, old, new_name): + """Rename an emote on this server. + + old: the name of the emote to rename, or the emote itself + new_name: what you'd like to rename it to + """ + emote = await self.parse_emote(context, old) + try: + await emote.edit( + name=new_name, + reason=f'Renamed by {utils.format_user(context.author)}') + except nextcord.HTTPException as ex: + return await context.send( + 'An error occurred while renaming the emote:\n' + + utils.format_http_exception(ex)) + + await context.send(fr'Emote successfully renamed to \:{new_name}:') + + @public + @emote_type_filter_default + @commands.command(aliases=('ls', 'dir')) + async def list(self, context, image_type='all'): + """A list of all emotes on this server. + + The list shows each emote and its raw form. + + If "animated" is provided, only show animated emotes. + If "static" is provided, only show static emotes. + If “all” is provided, show all emotes. + """ + emotes = sorted( + filter(image_type, context.guild.emojis), + key=lambda e: e.name.lower()) + + processed = [] + for emote in emotes: + raw = str(emote).replace(':', r'\:') + processed.append(f'{emote} {raw}') + + paginator = ListPaginator(context, processed) + self.paginators.add(paginator) + await paginator.begin() + + @public + @commands.command(aliases=['status']) + async def stats(self, context): + """The current number of animated and static emotes relative to the limits.""" + emote_limit = context.guild.emoji_limit + + static_emotes = animated_emotes = total_emotes = 0 + for emote in context.guild.emojis: + if emote.animated: + animated_emotes += 1 + else: + static_emotes += 1 + + total_emotes += 1 + + percent_static = round((static_emotes / emote_limit) * 100, 2) + percent_animated = round((animated_emotes / emote_limit) * 100, 2) + + static_left = emote_limit - static_emotes + animated_left = emote_limit - animated_emotes + + await context.send( + f'Static emotes: **{static_emotes} / {emote_limit}** ({static_left} left, {percent_static}% full)\n' + f'Animated emotes: **{animated_emotes} / {emote_limit}** ({animated_left} left, {percent_animated}% full)\n' + f'Total: **{total_emotes} / {emote_limit * 2}**') + + @commands.command(aliases=["embiggen"]) + async def big(self, context, emote): + """Shows the original image for the given emote. + + emote: the emote to embiggen. + """ + emote = await self.parse_emote(context, emote, local=False) + await context.send(f'{emote.name}: {emote.url}') + + async def parse_emote(self, context, name_or_emote, *, local=True): + # this function is mostly synchronous, + # so we yield in order to let the emoji cache update between repeated calls + await asyncio.sleep(0) + + match = utils.emote.RE_CUSTOM_EMOTE.match(name_or_emote) + if match: + id = int(match['id']) + if local: + emote = nextcord.utils.get(context.guild.emojis, id=id) + if emote: + return emote + else: + return nextcord.PartialEmoji( + animated=bool(match['animated']), + name=match['name'], + id=int(match['id']), + ) + name = name_or_emote + return await self.disambiguate(context, name) + + async def disambiguate(self, context, name): + # in case the user tries :foo: and foo is animated + name = name.strip(':') + candidates = [e for e in context.guild.emojis if e.name.lower( + ) == name.lower() and e.require_colons] + if not candidates: + raise errors.EmoteNotFoundError(name) + + if len(candidates) == 1: + return candidates[0] + + message = [ + 'Multiple emotes were found with that name. Which one do you mean?'] + for i, emote in enumerate(candidates, 1): + message.append(fr'{i}. {emote} (\:{emote.name}:)') + + await context.send('\n'.join(message)) + + def check(message): + try: + int(message.content) + except ValueError: + return False + else: + return message.author == context.author + + try: + message = await self.bot.wait_for('message', check=check, timeout=30) + except asyncio.TimeoutError: + raise commands.UserInputError( + 'Sorry, you took too long. Try again.') + + return candidates[int(message.content)-1] + def setup(bot): - bot.add_cog(Emotes(bot)) + bot.add_cog(Emotes(bot)) diff --git a/data/config.example.py b/data/config.example.py index d69fa4c..5bbea18 100644 --- a/data/config.example.py +++ b/data/config.example.py @@ -1,49 +1,53 @@ { - 'description': - 'Emote Manager lets you manage custom server emotes effortlessly.\n\n' - 'NOTE: Most commands will be unavailable until both you and the bot have the ' - '"Manage Emojis" permission.', + 'description': + 'Emote Manager lets you manage custom server emotes effortlessly.\n\n' + 'NOTE: Most commands will be unavailable until both you and the bot have the ' + '"Manage Emojis" permission.', - # a channel ID to invite people to when they request help with the bot - # the bot must have Create Instant Invite permissions for this channel - # if set to None, the support command will be disabled - 'support_server_invite_channel': None, - - 'prefixes': ['em/'], + # a channel ID to invite people to when they request help with the bot + # the bot must have Create Instant Invite permissions for this channel + # if set to None, the support command will be disabled + 'support_server_invite_channel': None, - 'tokens': { - 'discord': 'sek.rit.token', - }, + 'prefixes': ['em/'], - 'ignore_bots': { - 'default': True, - 'overrides': { - 'channels': [ - ], - 'guilds': [ - ], - }, - }, + 'tokens': { + 'discord': 'sek.rit.token', + }, - 'copyright_license_file': 'data/short-license.txt', + 'ignore_bots': { + 'default': True, + 'overrides': { + 'channels': [ + ], + 'guilds': [ + ], + }, + }, - 'socks5_proxy_url': None, # required for connecting to the EC API over a Tor onion service - 'use_socks5_for_all_connections': False, # whether to use socks5 for all HTTP operations (other than discord.py) - 'user_agent': 'EmoteManagerBot (https://github.com/iomintz/emote-manager-bot)', - 'ec_api_base_url': None, # set to None to use the default of https://ec.emote.bot/api/v0 - 'http_head_timeout': 10, # timeout for the initial HEAD request before retrieving any images (up this if using Tor) - 'http_read_timeout': 60, # timeout for retrieving an image + 'copyright_license_file': 'data/short-license.txt', - # emotes that the bot may use to respond to you - # If not provided, the bot will use '❌', '✅' instead. - # - # You can obtain these ones from the discordbots.org server under the name "tickNo" and "tickYes" - # but I uploaded them to my test server - # so that both the staging and the stable versions of the bot can use them - 'response_emojis': { - 'success': { # emotes used to indicate success or failure - False: '<:error:478164511879069707>', - True: '<:success:478164452261363712>' - }, - }, + # required for connecting to the EC API over a Tor onion service + 'socks5_proxy_url': None, + # whether to use socks5 for all HTTP operations (other than discord.py) + 'use_socks5_for_all_connections': False, + 'user_agent': 'EmoteManagerBot (https://github.com/iomintz/emote-manager-bot)', + # set to None to use the default of https://ec.emote.bot/api/v0 + 'ec_api_base_url': None, + # timeout for the initial HEAD request before retrieving any images (up this if using Tor) + 'http_head_timeout': 10, + 'http_read_timeout': 60, # timeout for retrieving an image + + # emotes that the bot may use to respond to you + # If not provided, the bot will use '❌', '✅' instead. + # + # You can obtain these ones from the discordbots.org server under the name "tickNo" and "tickYes" + # but I uploaded them to my test server + # so that both the staging and the stable versions of the bot can use them + 'response_emojis': { + 'success': { # emotes used to indicate success or failure + False: '<:error:478164511879069707>', + True: '<:success:478164452261363712>' + }, + }, }