add post importing

This commit is contained in:
Luna 2022-08-28 16:13:26 -03:00
parent c987eb7035
commit 3a8a3bab80
1 changed files with 125 additions and 9 deletions

View File

@ -7,6 +7,7 @@ import tempfile
import sys import sys
from dataclasses import dataclass from dataclasses import dataclass
from urllib.parse import urlparse from urllib.parse import urlparse
from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@ -30,6 +31,28 @@ class Tag:
post_count: int post_count: int
@dataclass
class Post:
id: int
uploader_id: int
created_at: int
md5: str
source: str
rating: str
tag_string: str
is_deleted: int
is_pending: int
is_flagged: int
score: int
up_score: int
down_score: int
is_rating_locked: int
def e621_bool(text: str) -> bool:
return text == "t"
async def main_with_ctx(ctx, wanted_date): async def main_with_ctx(ctx, wanted_date):
urls = { urls = {
"tags": f"https://e621.net/db_export/tags-{wanted_date}.csv.gz", "tags": f"https://e621.net/db_export/tags-{wanted_date}.csv.gz",
@ -129,6 +152,10 @@ async def main_with_ctx(ctx, wanted_date):
) )
await ctx.db.commit() await ctx.db.commit()
tag_count_rows = await ctx.db.execute_fetchall("select count(*) from tags")
tag_count = tag_count_rows[0][0]
log.info("already have %d tags", tag_count)
with output_uncompressed_paths["tags"].open( with output_uncompressed_paths["tags"].open(
mode="r", encoding="utf-8" mode="r", encoding="utf-8"
) as tags_csv_fd: ) as tags_csv_fd:
@ -139,27 +166,116 @@ async def main_with_ctx(ctx, wanted_date):
line_count -= 1 # remove header line_count -= 1 # remove header
log.info("%d tags to import", line_count) log.info("%d tags to import", line_count)
tags_csv_fd.seek(0)
tags_reader = csv.reader(tags_csv_fd)
assert len(next(tags_reader)) == 4 if line_count == tag_count:
log.info("same counts, not going to reimport")
else:
tags_csv_fd.seek(0)
tags_reader = csv.reader(tags_csv_fd)
assert len(next(tags_reader)) == 4
processed_count = 0
processed_ratio = 0
for row in tags_reader:
tag = Tag(int(row[0]), row[1], int(row[2]), int(row[3]))
await ctx.db.execute(
"insert into tags (id, name, category, post_count) values (?, ?, ?, ?)",
(tag.id, tag.name, tag.category, tag.post_count),
)
processed_count += 1
new_processed_ratio = round((processed_count / line_count) * 100)
if new_processed_ratio != processed_ratio:
log.info("tags processed at %d%%", processed_ratio)
processed_ratio = new_processed_ratio
log.info("tags done")
await ctx.db.commit()
with output_uncompressed_paths["posts"].open(
mode="r", encoding="utf-8"
) as posts_csv_fd:
line_count = 0
for line in posts_csv_fd:
line_count += 1
line_count -= 1 # remove header
log.info("%d posts to import", line_count)
posts_csv_fd.seek(0)
posts_reader = csv.DictReader(posts_csv_fd)
processed_count = 0 processed_count = 0
processed_ratio = 0 processed_ratio = 0
for row in tags_reader: for row in posts_reader:
tag = Tag(int(row[0]), row[1], int(row[2]), int(row[3])) created_at_str = row["created_at"]
created_at = datetime.strptime(
created_at_str[: created_at_str.find(".")], "%Y-%m-%d %H:%M:%S"
)
post = Post(
id=int(row["id"]),
uploader_id=int(row["uploader_id"]),
created_at=int(created_at.timestamp()),
md5=row["md5"],
source=row["source"],
rating=row["rating"],
tag_string=row["tag_string"],
is_deleted=e621_bool(row["is_deleted"]),
is_pending=e621_bool(row["is_pending"]),
is_flagged=e621_bool(row["is_flagged"]),
score=int(row["score"]),
up_score=int(row["up_score"]),
down_score=int(row["down_score"]),
is_rating_locked=e621_bool(row["is_rating_locked"]),
)
await ctx.db.execute( await ctx.db.execute(
"insert into tags (id, name, category, post_count) values (?, ?, ?, ?)", """
(tag.id, tag.name, tag.category, tag.post_count), insert into posts (
id,
uploader_id,
created_at,
md5,
source,
rating,
tag_string,
is_deleted,
is_pending,
is_flagged,
score,
up_score,
down_score,
is_rating_locked
) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""",
(
post.id,
post.uploader_id,
post.created_at,
post.md5,
post.source,
post.rating,
post.tag_string,
post.is_deleted,
post.is_pending,
post.is_flagged,
post.score,
post.up_score,
post.down_score,
post.is_rating_locked,
),
) )
processed_count += 1 processed_count += 1
new_processed_ratio = round((processed_count / line_count) * 100) new_processed_ratio = round((processed_count / line_count) * 100)
if new_processed_ratio != processed_ratio: if new_processed_ratio != processed_ratio:
log.info("processed at %d%%", processed_ratio) log.info("posts processed at %d%%", processed_ratio)
processed_ratio = new_processed_ratio processed_ratio = new_processed_ratio
log.info("done") log.info("posts done")
await ctx.db.commit() await ctx.db.commit()