mistress
Luna 6 months ago
parent b671c824fc
commit 96c7194a1a

1
.gitignore vendored

@ -152,3 +152,4 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
*.db

@ -1,3 +1,18 @@
# booru-tag-timeline
graph amount of posts with a certain tag over time made in a booru (Danbooru and Gelbooru supported)
graph amount of posts with a certain tag over time made in a booru (Danbooru and Gelbooru supported)
## use
```sh
pip install -Ur requirements.txt
# THERE IS LITERALLY THE ONE THING I HAD TO ASK THAT MADE THIS
# SOFTWARE INTO A THING.
#
# "HOW MUCH PORN OF BRIDGET WAS MADE BECAUSE OF THE TRANS ANNOUNCEMENT?"
#
# THIS IS AN IMPORTANT SCIENTIFIC QUESTION, AS A TRANS MYSELF, I NEED TO
# KNOW HOW MUCH GIRLDICK WE GOT
./timeliner.py gelbooru 'bridget_(guilty_gear)'
```

@ -0,0 +1,3 @@
plotly>5.10.0
requests>2.28.1
pandas>1.4.3

@ -0,0 +1,171 @@
#!/usr/bin/env python3
import sys
import time
import sqlite3
import requests
import logging
import datetime
import plotly.express as px
from collections import defaultdict
from dataclasses import dataclass
from typing import List, Optional
log = logging.getLogger(__name__)
@dataclass
class Post:
hash: str
inserted_at: datetime.datetime
@dataclass
class GelbooruCursor:
query: str
page: int = 0
count: Optional[int] = None
def __iter__(self):
return self
def __next__(self) -> List[Post]:
resp = requests.get(
"https://gelbooru.com/index.php",
params={
"page": "dapi",
"s": "post",
"json": "1",
"q": "index",
"tags": self.query,
"limit": "100",
"pid": self.page,
},
)
log.debug("made request to %r", resp.url)
assert resp.status_code == 200
rjson = resp.json()
attrs = rjson["@attributes"]
if "post" not in rjson:
log.info("page %d reached end of tag", self.page)
raise StopIteration()
self.count = self.count or attrs["count"]
log.info(
"page %d gave %d posts (total %d)",
self.page,
len(rjson["post"]),
self.count,
)
results = []
for entry in rjson["post"]:
parsed_time = time.strptime(entry["created_at"], "%a %b %d %H:%M:%S %z %Y")
results.append(
Post(
entry["md5"],
datetime.datetime.fromtimestamp(time.mktime(parsed_time)),
)
)
self.page += 1
return results
@dataclass
class Gelbooru:
typeid = 1
name = "Gelbooru"
def fetchall(self, query: str) -> GelbooruCursor:
return GelbooruCursor(query)
def main():
logging.basicConfig(level=logging.DEBUG)
log.debug("%r", sys.argv)
try:
booru = sys.argv[1]
tags = sys.argv[2]
except IndexError:
log.error("expected booru and tags argument")
return 1
if booru == "gelbooru":
booru_client = Gelbooru()
elif booru == "gelbooru":
booru_client = Danbooru()
raise NotImplementedError() # TODO
else:
log.error("booru must be one of {gelbooru, danbooru}")
return 1
db = sqlite3.connect("./timeliner-cache.db")
db.executescript(
"""
CREATE TABLE IF NOT EXISTS file_store (
booru_type text not null,
query text not null,
file_hash text not null,
inserted_at int not null,
constraint file_store_pk primary key (booru_type, query, file_hash)
) strict;
"""
)
try:
cur = db.execute(
"select file_hash, inserted_at from file_store where booru_type = ? and query = ?",
(booru_client.typeid, tags),
)
post_entries = cur.fetchall()
if not post_entries:
cursor = booru_client.fetchall(tags)
posts = [] # final data
for incoming_posts in cursor:
posts.extend(incoming_posts)
for post in incoming_posts:
db.execute(
"insert into file_store values (?, ?, ?, ?)",
(
booru_client.typeid,
tags,
post.hash,
post.inserted_at.timestamp(),
),
)
log.info("fetched %d posts", len(posts))
db.commit()
else:
posts = [
Post(entry[0], datetime.datetime.fromtimestamp(entry[1]))
for entry in post_entries
]
log.info("cached %d posts", len(posts))
finally:
db.close()
# now that we have data, bucket and plot?
# bucket data by day
buckets = defaultdict(int)
for post in posts:
date = (post.inserted_at.year, post.inserted_at.month, post.inserted_at.day)
buckets[date] += 1
post_frequencies = [
(f"{k[0]}/{k[1]}/{k[2]}", buckets[k]) for k in sorted(list(buckets.keys()))
]
fig = px.line(
post_frequencies,
x=0,
y=1,
title=f"amount of posts per day for given query ({tags}) in booru ({booru_client.name})",
)
fig.show()
return 0
if __name__ == "__main__":
sys.exit(main())
Loading…
Cancel
Save