Compare commits

..

No commits in common. "930b48e55b1b010c423cfc018c1cead1e152c79c" and "77466bab7c704ee4a0829d8d13ff9bd2bc0a19d5" have entirely different histories.

2 changed files with 121 additions and 116 deletions

View file

@ -61,7 +61,7 @@ async def main_with_ctx(ctx, wanted_date):
} }
output_compressed_paths = {} output_compressed_paths = {}
output_fd = {} output_uncompressed_paths = {}
for url_type, url in urls.items(): for url_type, url in urls.items():
parsed = urlparse(url) parsed = urlparse(url)
@ -69,6 +69,10 @@ async def main_with_ctx(ctx, wanted_date):
output_path = Path.cwd() / parsed_path.name output_path = Path.cwd() / parsed_path.name
output_compressed_paths[url_type] = output_path output_compressed_paths[url_type] = output_path
original_name, original_extension, _gz = parsed_path.name.split(".")
output_uncompressed = Path.cwd() / f"{original_name}.{original_extension}"
output_uncompressed_paths[url_type] = output_uncompressed
for url_type, url in urls.items(): for url_type, url in urls.items():
output_path = output_compressed_paths[url_type] output_path = output_compressed_paths[url_type]
if output_path.exists(): if output_path.exists():
@ -104,7 +108,16 @@ async def main_with_ctx(ctx, wanted_date):
# decompress # decompress
for url_type, _url in urls.items(): for url_type, _url in urls.items():
input_path = output_compressed_paths[url_type] input_path = output_compressed_paths[url_type]
output_fd[url_type] = gzip.open(input_path, "rt") output_path = output_uncompressed_paths[url_type]
if output_path.exists():
log.info("decompressed file %s already exists, ignoring", output_path)
continue
log.info("decompressing %s into %s", input_path.name, output_path.name)
with gzip.open(input_path, "rb") as in_fd:
with output_path.open(mode="wb") as out_fd:
shutil.copyfileobj(in_fd, out_fd)
# now that everythings downloaded, compile the db # now that everythings downloaded, compile the db
await ctx.db.executescript( await ctx.db.executescript(
@ -140,113 +153,100 @@ async def main_with_ctx(ctx, wanted_date):
) )
await ctx.db.commit() await ctx.db.commit()
work_done = False
tags_csv_fd = output_fd["tags"]
work_done = work_done or await work_tags(ctx, tags_csv_fd)
log.info("going to process posts")
posts_csv_fd = output_fd["posts"]
any_posts = await work_posts(ctx, posts_csv_fd)
work_done = work_done or any_posts
if work_done:
log.info("vacuuming db...")
await ctx.db.execute("vacuum")
log.info("database built")
async def work_tags(ctx, tags_csv_fd):
tag_count_rows = await ctx.db.execute_fetchall("select count(*) from tags") tag_count_rows = await ctx.db.execute_fetchall("select count(*) from tags")
tag_count = tag_count_rows[0][0] tag_count = tag_count_rows[0][0]
log.info("already have %d tags", tag_count) log.info("already have %d tags", tag_count)
line_count = 0 work_done = False
for line in tags_csv_fd:
line_count += 1
line_count -= 1 # remove header with output_uncompressed_paths["tags"].open(
mode="r", encoding="utf-8"
) as tags_csv_fd:
line_count = 0
for line in tags_csv_fd:
line_count += 1
log.info("%d tags to import", line_count) line_count -= 1 # remove header
if line_count == tag_count: log.info("%d tags to import", line_count)
log.info("same counts, not going to reimport")
return False
else:
tags_csv_fd.seek(0)
tags_reader = csv.reader(tags_csv_fd)
assert len(next(tags_reader)) == 4 if line_count == tag_count:
log.info("same counts, not going to reimport")
else:
tags_csv_fd.seek(0)
tags_reader = csv.reader(tags_csv_fd)
processed_count = 0 assert len(next(tags_reader)) == 4
processed_ratio = 0
for row in tags_reader: processed_count = 0
tag = Tag(int(row[0]), row[1], int(row[2]), int(row[3])) processed_ratio = 0
await ctx.db.execute(
"insert into tags (id, name, category, post_count) values (?, ?, ?, ?) on conflict do nothing",
(tag.id, tag.name, tag.category, tag.post_count),
)
processed_count += 1
new_processed_ratio = round((processed_count / line_count) * 100)
if new_processed_ratio != processed_ratio:
log.info("tags processed at %d%%", processed_ratio)
processed_ratio = new_processed_ratio
log.info("tags commit...") for row in tags_reader:
await ctx.db.commit() tag = Tag(int(row[0]), row[1], int(row[2]), int(row[3]))
log.info("tags done...") await ctx.db.execute(
return True "insert into tags (id, name, category, post_count) values (?, ?, ?, ?) on conflict do nothing",
(tag.id, tag.name, tag.category, tag.post_count),
)
processed_count += 1
new_processed_ratio = round((processed_count / line_count) * 100)
if new_processed_ratio != processed_ratio:
log.info("tags processed at %d%%", processed_ratio)
processed_ratio = new_processed_ratio
log.info("tags done")
work_done = True
async def work_posts(ctx, posts_csv_fd): await ctx.db.commit()
log.info("going to process posts")
post_count_rows = await ctx.db.execute_fetchall("select count(*) from posts") post_count_rows = await ctx.db.execute_fetchall("select count(*) from posts")
post_count = post_count_rows[0][0] post_count = post_count_rows[0][0]
log.info("already have %d posts", post_count) log.info("already have %d posts", post_count)
line_count = 0 with output_uncompressed_paths["posts"].open(
counter_reader = csv.reader(posts_csv_fd) mode="r", encoding="utf-8"
for _row in counter_reader: ) as posts_csv_fd:
line_count += 1 line_count = 0
line_count -= 1 # remove header counter_reader = csv.reader(posts_csv_fd)
for _row in counter_reader:
line_count += 1
line_count -= 1 # remove header
log.info("%d posts to import", line_count) log.info("%d posts to import", line_count)
if line_count == post_count: if line_count == post_count:
log.info("already imported everything, skipping") log.info("already imported everything, skipping")
return False else:
else: posts_csv_fd.seek(0)
posts_csv_fd.seek(0) posts_reader = csv.DictReader(posts_csv_fd)
posts_reader = csv.DictReader(posts_csv_fd)
processed_count = 0 processed_count = 0
processed_ratio = 0.0 processed_ratio = 0.0
for row in posts_reader: for row in posts_reader:
created_at_str = row["created_at"] created_at_str = row["created_at"]
created_at = datetime.strptime( created_at = datetime.strptime(
created_at_str[: created_at_str.find(".")], "%Y-%m-%d %H:%M:%S" created_at_str[: created_at_str.find(".")], "%Y-%m-%d %H:%M:%S"
) )
post = Post( post = Post(
id=int(row["id"]), id=int(row["id"]),
uploader_id=int(row["uploader_id"]), uploader_id=int(row["uploader_id"]),
created_at=int(created_at.timestamp()), created_at=int(created_at.timestamp()),
md5=row["md5"], md5=row["md5"],
source=row["source"], source=row["source"],
rating=row["rating"], rating=row["rating"],
tag_string=row["tag_string"], tag_string=row["tag_string"],
is_deleted=e621_bool(row["is_deleted"]), is_deleted=e621_bool(row["is_deleted"]),
is_pending=e621_bool(row["is_pending"]), is_pending=e621_bool(row["is_pending"]),
is_flagged=e621_bool(row["is_flagged"]), is_flagged=e621_bool(row["is_flagged"]),
score=int(row["score"]), score=int(row["score"]),
up_score=int(row["up_score"]), up_score=int(row["up_score"]),
down_score=int(row["down_score"]), down_score=int(row["down_score"]),
is_rating_locked=e621_bool(row["is_rating_locked"]), is_rating_locked=e621_bool(row["is_rating_locked"]),
) )
await ctx.db.execute( await ctx.db.execute(
""" """
insert into posts ( insert into posts (
id, id,
uploader_id, uploader_id,
@ -264,33 +264,38 @@ async def work_posts(ctx, posts_csv_fd):
is_rating_locked is_rating_locked
) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?) on conflict do nothing ) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?) on conflict do nothing
""", """,
( (
post.id, post.id,
post.uploader_id, post.uploader_id,
post.created_at, post.created_at,
post.md5, post.md5,
post.source, post.source,
post.rating, post.rating,
post.tag_string, post.tag_string,
post.is_deleted, post.is_deleted,
post.is_pending, post.is_pending,
post.is_flagged, post.is_flagged,
post.score, post.score,
post.up_score, post.up_score,
post.down_score, post.down_score,
post.is_rating_locked, post.is_rating_locked,
), ),
) )
processed_count += 1 processed_count += 1
new_processed_ratio = round((processed_count / line_count) * 100, 2) new_processed_ratio = round((processed_count / line_count) * 100, 2)
if str(new_processed_ratio) != str(processed_ratio): if str(new_processed_ratio) != str(processed_ratio):
log.info("posts processed at %.2f%%", processed_ratio) log.info("posts processed at %.2f%%", processed_ratio)
processed_ratio = new_processed_ratio processed_ratio = new_processed_ratio
log.info("posts commit...") log.info("posts done")
await ctx.db.commit() work_done = True
log.info("posts done")
return True await ctx.db.commit()
if work_done:
log.info("vacuuming db...")
await ctx.db.execute("vacuum")
log.info("database built")
async def main(): async def main():

View file

@ -1,3 +1,3 @@
aiohttp==3.9.1 aiohttp==3.8.1
aiosqlite==0.19.0 aiosqlite==0.17.0
Quart==0.19.4 Quart==0.18.0