e621_api_cloner/build_database.py

56 lines
1.6 KiB
Python

import asyncio
import logging
import shutil
import tempfile
import sys
from dataclasses import dataclass
from urllib.parse import urlparse
from pathlib import Path
import aiohttp
log = logging.getLogger(__name__)
@dataclass
class Context:
session: aiohttp.ClientSession
async def main():
wanted_date = sys.argv[1]
urls = (
f"https://e621.net/db_export/tags-{wanted_date}.csv.gz",
f"https://e621.net/db_export/posts-{wanted_date}.csv.gz",
)
async with aiohttp.ClientSession() as session:
ctx = Context(session)
for url in urls:
parsed = urlparse(url)
parsed_path = Path(parsed.path)
output_path = Path.cwd() / parsed_path.name
if not output_path.exists():
log.info("downloading %r into %s", url, output_path)
async with ctx.session.get(url) as resp:
assert resp.status == 200
with tempfile.TemporaryFile() as temp_fd:
async for chunk in resp.content.iter_chunked(8192):
temp_fd.write(chunk)
# write to output
log.info("copying temp to output")
with output_path.open(mode="wb") as output_fd:
shutil.copyfileobj(temp_fd, output_fd)
else:
log.info("file %s already exists, ignoring", output_path)
# now that everythings downloaded, compile the db
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(main())