mirror of
https://github.com/uhIgnacio/EmoteManager.git
synced 2024-08-15 02:23:13 +00:00
add zip/tar archive support 🎉
This commit is contained in:
parent
7dd40ce0e5
commit
10ed6bb63f
5 changed files with 199 additions and 40 deletions
|
@ -2,6 +2,7 @@
|
|||
# encoding: utf-8
|
||||
|
||||
from .misc import *
|
||||
from . import archive
|
||||
from . import emote
|
||||
from . import errors
|
||||
from . import paginator
|
||||
|
|
92
utils/archive.py
Normal file
92
utils/archive.py
Normal file
|
@ -0,0 +1,92 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import asyncio
|
||||
import collections
|
||||
import enum
|
||||
import posixpath
|
||||
import tarfile
|
||||
import typing.io
|
||||
import zipfile
|
||||
from typing import Iterable, Tuple, Optional
|
||||
|
||||
from . import errors
|
||||
|
||||
ArchiveInfo = collections.namedtuple('ArchiveInfo', 'filename content error')
|
||||
|
||||
def extract(archive: typing.io.BinaryIO, *, size_limit=None) \
|
||||
-> Iterable[Tuple[str, Optional[bytes], Optional[BaseException]]]:
|
||||
"""
|
||||
extract a binary file-like object representing a zip or uncompressed tar archive, yielding filenames and contents.
|
||||
|
||||
yields ArchiveInfo objects: (filename: str, content: typing.Optional[bytes], error: )
|
||||
if size_limit is not None and the size limit is exceeded, or for any other error, yield None for content
|
||||
on success, error will be None
|
||||
"""
|
||||
|
||||
try:
|
||||
yield from extract_zip(archive, size_limit=size_limit)
|
||||
return
|
||||
except zipfile.BadZipFile:
|
||||
pass
|
||||
finally:
|
||||
archive.seek(0)
|
||||
|
||||
try:
|
||||
yield from extract_tar(archive, size_limit=size_limit)
|
||||
except tarfile.ReadError as exc:
|
||||
raise ValueError('not a valid zip or tar file') from exc
|
||||
finally:
|
||||
archive.seek(0)
|
||||
|
||||
def extract_zip(archive, *, size_limit=None):
|
||||
with zipfile.ZipFile(archive) as zip:
|
||||
members = [m for m in zip.infolist() if not m.is_dir()]
|
||||
for member in members:
|
||||
if size_limit is not None and member.file_size >= size_limit:
|
||||
yield ArchiveInfo(
|
||||
filename=member.filename,
|
||||
content=None,
|
||||
error=errors.FileTooBigError(member.file_size, size_limit))
|
||||
continue
|
||||
|
||||
try:
|
||||
content = zip.open(member).read()
|
||||
except RuntimeError as exc: # why no specific exceptions smh
|
||||
yield ArchiveInfo(filename=member.filename, content=None, error=exc)
|
||||
else: # this else is required to avoid UnboundLocalError for some reason
|
||||
yield ArchiveInfo(filename=member.filename, content=content, error=None)
|
||||
|
||||
def extract_tar(archive, *, size_limit=None):
|
||||
with tarfile.open(fileobj=archive) as tar:
|
||||
members = [f for f in tar.getmembers() if f.isfile()]
|
||||
for member in members:
|
||||
if size_limit is not None and member.size >= size_limit:
|
||||
yield ArchiveInfo(
|
||||
filename=member.name,
|
||||
content=None,
|
||||
error=errors.FileTooBigError(member.size, size_limit))
|
||||
continue
|
||||
|
||||
yield ArchiveInfo(member.name, content=tar.extractfile(member).read(), error=None)
|
||||
|
||||
async def extract_async(archive: typing.io.BinaryIO, size_limit=None):
|
||||
gen = await asyncio.get_event_loop().run_in_executor(None, extract, archive)
|
||||
for x in gen:
|
||||
yield await asyncio.sleep(0, x)
|
||||
|
||||
def main():
|
||||
import io
|
||||
import sys
|
||||
|
||||
import humanize
|
||||
|
||||
arc = io.BytesIO(sys.stdin.detach().read())
|
||||
for name, data, error in extract(arc):
|
||||
if error is not None:
|
||||
print(f'{name}: {error}')
|
||||
continue
|
||||
|
||||
print(f'{name}: {humanize.naturalsize(len(data)):>10}')
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
|
@ -4,7 +4,6 @@ from discord.ext import commands
|
|||
|
||||
import utils
|
||||
|
||||
|
||||
class MissingManageEmojisPermission(commands.MissingPermissions):
|
||||
"""The invoker or the bot doesn't have permissions to manage server emojis."""
|
||||
|
||||
|
@ -33,16 +32,21 @@ class EmoteNotFoundError(EmoteManagerError):
|
|||
def __init__(self, name):
|
||||
super().__init__(f'An emote called `{name}` does not exist in this server.')
|
||||
|
||||
class InvalidImageError(EmoteManagerError):
|
||||
class FileTooBigError(EmoteManagerError):
|
||||
def __init__(self, size, limit):
|
||||
self.size = size
|
||||
self.limit = limit
|
||||
|
||||
class InvalidFileError(EmoteManagerError):
|
||||
"""The file is not a zip, tar, GIF, PNG, or JPG file."""
|
||||
def __init__(self):
|
||||
super().__init__('Invalid file given.')
|
||||
|
||||
class InvalidImageError(InvalidFileError):
|
||||
"""The image is not a GIF, PNG, or JPG"""
|
||||
def __init__(self):
|
||||
super().__init__('The image supplied was not a GIF, PNG, or JPG.')
|
||||
|
||||
class NoMoreSlotsError(EmoteManagerError):
|
||||
"""Raised in case all slots of a particular type (static/animated) are full"""
|
||||
def __init__(self):
|
||||
super().__init__('No more backend slots available.')
|
||||
|
||||
class PermissionDeniedError(EmoteManagerError):
|
||||
"""Raised when a user tries to modify an emote without the Manage Emojis permission"""
|
||||
def __init__(self, name):
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue