tagger-showdown/main.py

577 lines
17 KiB
Python

import time
import os
import json
import aiohttp
import pprint
import decimal
import aiofiles
import sys
import asyncio
import aiosqlite
import base64
import logging
import pandas as pd
import plotly.express as px
import plotly.graph_objs as go
import plotly.io as pio
from collections import defaultdict, Counter
from pathlib import Path
from urllib.parse import urlparse
from typing import Any, Optional, List, Set, Tuple, Dict
from aiolimiter import AsyncLimiter
from dataclasses import dataclass, field
log = logging.getLogger(__name__)
# from https://github.com/picobyte/stable-diffusion-webui-wd14-tagger/blob/49cfdcd6be77086e6cc0d26b5ec26605b702be6e/tagger/utils.py#L15C1-L58C1
DEFAULTS = [
"wd14-vit.v1",
"wd14-vit.v2",
"wd14-convnext.v1",
"wd14-convnext.v2",
"wd14-convnextv2.v1",
# "wd14-swinv2-v1",
"wd-v1-4-moat-tagger.v2",
"mld-caformer.dec-5-97527",
# broken model: "mld-tresnetd.6-30000",
]
@dataclass
class InterrogatorPost:
tags: List[str]
time_taken: float
@dataclass
class Interrogator:
model_id: str
address: str
def _process(self, lst):
return lst
async def fetch(self, ctx, md5_hash):
rows = await ctx.db.execute_fetchall(
"select output_tag_string, time_taken from interrogated_posts where md5 = ? and model_name = ?",
(md5_hash, self.model_id),
)
if not rows:
raise AssertionError("run fight mode first, there are missing posts..")
tag_string, time_taken = rows[0][0], rows[0][1]
return InterrogatorPost(self._process(tag_string.split()), time_taken)
class DDInterrogator(Interrogator):
def _process(self, lst):
new_lst = []
for tag in lst:
if tag.startswith("rating:"):
original_danbooru_tag = tag.split(":")[1]
else:
original_danbooru_tag = tag
if original_danbooru_tag == "safe":
original_danbooru_tag = "general"
new_lst.append(original_danbooru_tag)
return new_lst
async def interrogate(self, ctx, path):
async with ctx.session.post(
f"{self.address}/",
params={
"threshold": "0.7",
},
headers={"Authorization": "Bearer sex"},
data={"file": path.open("rb")},
) as resp:
assert resp.status == 200
tags = await resp.json()
upstream_tags = [tag.replace(" ", "_") for tag in tags]
return " ".join(upstream_tags)
class SDInterrogator(Interrogator):
async def interrogate(self, ctx, path):
async with aiofiles.open(path, "rb") as fd:
as_base64 = base64.b64encode(await fd.read()).decode("utf-8")
async with ctx.session.post(
f"{self.address}/tagger/v1/interrogate",
json={
"model": self.model_id,
"threshold": 0.7,
"image": as_base64,
},
) as resp:
log.info("got %d", resp.status)
assert resp.status == 200
data = await resp.json()
tags_with_weights = data["caption"]
tags = []
for tag, weight in tags_with_weights.items():
tags.append(tag)
upstream_tags = [tag.replace(" ", "_") for tag in tags]
return " ".join(upstream_tags)
class ControlInterrogator(Interrogator):
async def interrogate(self, ctx, path):
md5_hash = Path(path).stem
post = await fetch_post(ctx, md5_hash)
return post["tag_string"]
@dataclass
class Config:
sd_webui_address: str
dd_address: str
dd_model_name: str
sd_webui_extras: Dict[str, str]
sd_webui_models: List[str] = field(default_factory=lambda: list(DEFAULTS))
@property
def all_available_models(self) -> List[Any]:
return (
[
SDInterrogator(sd_interrogator, self.sd_webui_address)
for sd_interrogator in self.sd_webui_models
]
+ [
SDInterrogator(sd_interrogator, url)
for sd_interrogator, url in self.sd_webui_extras.items()
]
+ [DDInterrogator(self.dd_model_name, self.dd_address)]
+ [ControlInterrogator("control", None)]
)
@dataclass
class Context:
db: Any
session: aiohttp.ClientSession
config: Config
@dataclass
class Booru:
session: aiohttp.ClientSession
limiter: AsyncLimiter
tag_limiter: AsyncLimiter
file_locks: dict = field(default_factory=lambda: defaultdict(asyncio.Lock))
tag_locks: dict = field(default_factory=lambda: defaultdict(asyncio.Lock))
fetch_type = "hash"
@property
def hash_style(self):
return HashStyle.md5
class Danbooru(Booru):
title = "Danbooru"
base_url = "https://danbooru.donmai.us"
async def posts(self, tag_query: str, limit, page: int):
log.info("%s: submit %r", self.title, tag_query)
async with self.limiter:
log.info("%s: submit upstream %r", self.title, tag_query)
async with self.session.get(
f"{self.base_url}/posts.json",
params={"tags": tag_query, "limit": limit, "page": page},
) as resp:
assert resp.status == 200
rjson = await resp.json()
return rjson
DOWNLOADS = Path.cwd() / "posts"
DOWNLOADS.mkdir(exist_ok=True)
ALLOWED_EXTENSIONS = (
"jpg",
"jpeg",
"png",
)
async def download_images(ctx):
try:
tagquery = sys.argv[2]
except IndexError:
tagquery = ""
try:
limit = int(sys.argv[3])
except IndexError:
limit = 30
try:
pageskip = int(sys.argv[4])
except IndexError:
pageskip = 150
danbooru = Danbooru(
ctx.session,
AsyncLimiter(1, 10),
AsyncLimiter(1, 3),
)
posts = await danbooru.posts(tagquery, limit, pageskip)
for post in posts:
if "md5" not in post:
continue
md5 = post["md5"]
log.info("processing post %r", md5)
existing_post = await fetch_post(ctx, md5)
if existing_post:
log.info("already exists %r", md5)
continue
# download the post
post_file_url = post["file_url"]
post_url_path = Path(urlparse(post_file_url).path)
good_extension = False
for extension in ALLOWED_EXTENSIONS:
if extension in post_url_path.suffix:
good_extension = True
if not good_extension:
log.info("ignoring %r, invalid extension (%r)", md5, post_url_path.suffix)
continue
post_filename = post_url_path.name
post_filepath = DOWNLOADS / post_filename
if not post_filepath.exists():
log.info("downloading %r to %r", post_file_url, post_filepath)
async with ctx.session.get(post_file_url) as resp:
assert resp.status == 200
with post_filepath.open("wb") as fd:
async for chunk in resp.content.iter_chunked(1024):
fd.write(chunk)
# when it's done successfully, insert it
await insert_post(ctx, post)
async def fetch_post(ctx, md5) -> Optional[dict]:
rows = await ctx.db.execute_fetchall("select data from posts where md5 = ?", (md5,))
if not rows:
return None
assert len(rows) == 1
post = json.loads(rows[0][0])
post_rating = post["rating"]
match post_rating:
case "g":
rating_tag = "general"
case "s":
rating_tag = "sensitive"
case "q":
rating_tag = "questionable"
case "e":
rating_tag = "explicit"
case _:
raise AssertionError("invalid post rating {post_rating!r}")
post["tag_string"] = post["tag_string"] + " " + rating_tag
return post
async def insert_post(ctx, post):
await ctx.db.execute_insert(
"insert into posts (md5, filepath ,data) values (?, ?,?)",
(
post["md5"],
Path(urlparse(post["file_url"]).path).name,
json.dumps(post),
),
)
await ctx.db.commit()
async def insert_interrogated_result(
ctx, interrogator: Interrogator, md5: str, tag_string: str, time_taken: float
):
await ctx.db.execute_insert(
"insert into interrogated_posts (md5, model_name, output_tag_string, time_taken) values (?,?,?,?)",
(md5, interrogator.model_id, tag_string, time_taken),
)
await ctx.db.commit()
async def fight(ctx):
interrogators = ctx.config.all_available_models
all_rows = await ctx.db.execute_fetchall("select md5 from posts")
all_hashes = set(r[0] for r in all_rows)
for interrogator in interrogators:
log.info("processing for %r", interrogator)
# calculate set of images we didn't interrogate yet
interrogated_rows = await ctx.db.execute_fetchall(
"select md5 from interrogated_posts where model_name = ?",
(interrogator.model_id,),
)
interrogated_hashes = set(row[0] for row in interrogated_rows)
missing_hashes = all_hashes - interrogated_hashes
log.info("missing %d hashes", len(missing_hashes))
for index, missing_hash in enumerate(missing_hashes):
log.info(
"interrogating %r (%d/%d)", missing_hash, index, len(missing_hashes)
)
post_filepath = next(DOWNLOADS.glob(f"{missing_hash}*"))
start_ts = time.monotonic()
tag_string = await interrogator.interrogate(ctx, post_filepath)
end_ts = time.monotonic()
time_taken = round(end_ts - start_ts, 10)
log.info("took %.5fsec, got %r", time_taken, tag_string)
await insert_interrogated_result(
ctx, interrogator, missing_hash, tag_string, time_taken
)
def score(
danbooru_tags: Set[str], interrogator_tags: Set[str]
) -> Tuple[decimal.Decimal, Set[str]]:
tags_in_danbooru = danbooru_tags.intersection(interrogator_tags)
tags_not_in_danbooru = interrogator_tags - danbooru_tags
return (
round(
decimal.Decimal(len(tags_in_danbooru) - len(tags_not_in_danbooru))
/ decimal.Decimal(len(danbooru_tags)),
10,
),
tags_not_in_danbooru,
)
async def scores(ctx):
interrogators = ctx.config.all_available_models
all_rows = await ctx.db.execute_fetchall("select md5 from posts")
all_hashes = set(r[0] for r in all_rows)
# absolute_scores = defaultdict(decimal.Decimal)
model_scores = defaultdict(dict)
runtimes = defaultdict(list)
incorrect_tags_counters = defaultdict(Counter)
for md5_hash in all_hashes:
log.info("processing for %r", md5_hash)
post = await fetch_post(ctx, md5_hash)
danbooru_tags = set(post["tag_string"].split())
for interrogator in interrogators:
post_data = await interrogator.fetch(ctx, md5_hash)
runtimes[interrogator.model_id].append(post_data.time_taken)
interrogator_tags = set(post_data.tags)
tagging_score, incorrect_tags = score(danbooru_tags, interrogator_tags)
for tag in incorrect_tags:
incorrect_tags_counters[interrogator.model_id][tag] += 1
model_scores[interrogator.model_id][md5_hash] = {
"score": tagging_score,
"incorrect_tags": incorrect_tags,
}
summed_scores = {
model_id: sum(d["score"] for d in post_scores.values())
for model_id, post_scores in model_scores.items()
}
normalized_scores = {
model: round(summed_scores[model] / len(all_hashes), 10)
for model in summed_scores
}
print("scores are [worst...best]")
for model in sorted(
normalized_scores.keys(),
key=lambda model: normalized_scores[model],
reverse=True,
):
average_runtime = sum(runtimes[model]) / len(runtimes[model])
print(model, normalized_scores[model], "runtime", average_runtime, "sec")
if os.getenv("SHOWOFF", "0") == "1":
print("[", end="")
for bad_md5_hash in sorted(
model_scores[model].keys(),
key=lambda md5_hash: model_scores[model][md5_hash]["score"],
)[:4]:
data = model_scores[model][bad_md5_hash]
if os.getenv("DEBUG", "0") == "1":
print(md5_hash, data["score"], " ".join(data["incorrect_tags"]))
else:
print(data["score"], end=",")
print("...", end="")
for good_md5_hash in sorted(
model_scores[model].keys(),
key=lambda md5_hash: model_scores[model][md5_hash]["score"],
reverse=True,
)[:4]:
data = model_scores[model][good_md5_hash]
print(data["score"], end=",")
print("]")
print("most incorrect tags", incorrect_tags_counters[model].most_common(5))
PLOTS = Path.cwd() / "plots"
PLOTS.mkdir(exist_ok=True)
log.info("plotting score histogram...")
data_for_df = {}
data_for_df["scores"] = []
data_for_df["model"] = []
for model in sorted(
normalized_scores.keys(),
key=lambda model: normalized_scores[model],
reverse=True,
):
for post_score in (d["score"] for d in model_scores[model].values()):
data_for_df["scores"].append(post_score)
data_for_df["model"].append(model)
df = pd.DataFrame(data_for_df)
fig = px.histogram(
df,
x="scores",
color="model",
histfunc="count",
marginal="rug",
histnorm="probability",
)
pio.write_image(fig, PLOTS / "score_histogram.png", width=1024, height=800)
log.info("plotting positive histogram...")
plot2(PLOTS / "positive_score_histogram.png", normalized_scores, model_scores)
log.info("plotting error rates...")
plot3(PLOTS / "error_rate.png", normalized_scores, model_scores)
def plot2(output_path, normalized_scores, model_scores):
data_for_df = {}
data_for_df["scores"] = []
data_for_df["model"] = []
for model in sorted(
normalized_scores.keys(),
key=lambda model: normalized_scores[model],
reverse=True,
):
for post_score in (d["score"] for d in model_scores[model].values()):
if post_score < 0:
continue
data_for_df["scores"].append(post_score)
data_for_df["model"].append(model)
df = pd.DataFrame(data_for_df)
fig = px.histogram(
df,
x="scores",
color="model",
histfunc="count",
marginal="rug",
histnorm="probability",
)
pio.write_image(fig, output_path, width=1024, height=800)
def plot3(output_path, normalized_scores, model_scores):
data_for_df = {
"model": [],
"errors": [],
"rating_errors": [],
"practical_errors": [],
}
for model in sorted(
normalized_scores.keys(),
key=lambda model: normalized_scores[model],
reverse=True,
):
total_incorrect_tags = 0
total_rating_errors = 0
for score_data in model_scores[model].values():
total_incorrect_tags += len(score_data["incorrect_tags"])
total_rating_errors += sum(
1
for rating in ["general", "sensitive", "questionable", "explicit"]
if rating in score_data["incorrect_tags"]
)
practical_absolute_error = total_incorrect_tags - total_rating_errors
data_for_df["errors"].append(total_incorrect_tags)
data_for_df["rating_errors"].append(total_rating_errors)
data_for_df["practical_errors"].append(practical_absolute_error)
data_for_df["model"].append(model)
df = pd.DataFrame(data_for_df)
fig = go.Figure(
data=[
go.Bar(name="incorrect tags", x=df.model, y=df.errors),
go.Bar(name="incorrect ratings", x=df.model, y=df.rating_errors),
go.Bar(name="practical error", x=df.model, y=df.practical_errors),
]
)
pio.write_image(fig, output_path, width=1024, height=800)
async def realmain(ctx):
await ctx.db.executescript(
"""
create table if not exists posts (
md5 text primary key,
filepath text,
data text
);
create table if not exists interrogated_posts (
md5 text,
model_name text not null,
output_tag_string text not null,
time_taken real not null,
primary key (md5, model_name)
);
"""
)
try:
mode = sys.argv[1]
except IndexError:
raise Exception("must have mode")
if mode == "download_images":
await download_images(ctx)
elif mode == "fight":
await fight(ctx)
elif mode == "scores":
await scores(ctx)
else:
raise AssertionError(f"invalid mode {mode}")
async def main():
CONFIG_PATH = Path.cwd() / "config.json"
log.info("hewo")
async with aiosqlite.connect("./data.db") as db:
async with aiohttp.ClientSession() as session:
with CONFIG_PATH.open() as config_fd:
config_json = json.load(config_fd)
config = Config(**config_json)
ctx = Context(db, session, config)
await realmain(ctx)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(main())