#!/usr/bin/env python3 import sys import time import sqlite3 import requests import logging import datetime import plotly.express as px from collections import defaultdict from dataclasses import dataclass from typing import List, Optional log = logging.getLogger(__name__) @dataclass class Post: hash: str inserted_at: datetime.datetime @dataclass class GelbooruCursor: query: str page: int = 0 count: Optional[int] = None def __iter__(self): return self def __next__(self) -> List[Post]: resp = requests.get( "https://gelbooru.com/index.php", params={ "page": "dapi", "s": "post", "json": "1", "q": "index", "tags": self.query, "limit": "100", "pid": self.page, }, ) log.debug("made request to %r", resp.url) assert resp.status_code == 200 rjson = resp.json() attrs = rjson["@attributes"] if "post" not in rjson: log.info("page %d reached end of tag", self.page) raise StopIteration() self.count = self.count or attrs["count"] log.info( "page %d gave %d posts (total %d)", self.page, len(rjson["post"]), self.count, ) results = [] for entry in rjson["post"]: parsed_time = time.strptime(entry["created_at"], "%a %b %d %H:%M:%S %z %Y") results.append( Post( entry["md5"], datetime.datetime.fromtimestamp(time.mktime(parsed_time)), ) ) self.page += 1 return results @dataclass class Gelbooru: typeid = 1 name = "Gelbooru" def fetchall(self, query: str) -> GelbooruCursor: return GelbooruCursor(query) def main(): logging.basicConfig(level=logging.DEBUG) log.debug("%r", sys.argv) try: booru = sys.argv[1] tags = sys.argv[2] except IndexError: log.error("expected booru and tags argument") return 1 if booru == "gelbooru": booru_client = Gelbooru() elif booru == "gelbooru": booru_client = Danbooru() raise NotImplementedError() # TODO else: log.error("booru must be one of {gelbooru, danbooru}") return 1 db = sqlite3.connect("./timeliner-cache.db") db.executescript( """ CREATE TABLE IF NOT EXISTS file_store ( booru_type text not null, query text not null, file_hash text not null, inserted_at int not null, constraint file_store_pk primary key (booru_type, query, file_hash) ) strict; """ ) try: cur = db.execute( "select file_hash, inserted_at from file_store where booru_type = ? and query = ?", (booru_client.typeid, tags), ) post_entries = cur.fetchall() if not post_entries: cursor = booru_client.fetchall(tags) posts = [] # final data for incoming_posts in cursor: posts.extend(incoming_posts) for post in incoming_posts: db.execute( "insert into file_store values (?, ?, ?, ?)", ( booru_client.typeid, tags, post.hash, post.inserted_at.timestamp(), ), ) log.info("fetched %d posts", len(posts)) db.commit() else: posts = [ Post(entry[0], datetime.datetime.fromtimestamp(entry[1])) for entry in post_entries ] log.info("cached %d posts", len(posts)) finally: db.close() # now that we have data, bucket and plot? # bucket data by day buckets = defaultdict(int) for post in posts: date = (post.inserted_at.year, post.inserted_at.month, post.inserted_at.day) buckets[date] += 1 post_frequencies = [ (f"{k[0]}/{k[1]}/{k[2]}", buckets[k]) for k in sorted(list(buckets.keys())) ] fig = px.line( post_frequencies, x=0, y=1, title=f"amount of posts per day for given query ({tags}) in booru ({booru_client.name})", ) fig.show() return 0 if __name__ == "__main__": sys.exit(main())