From b0c729789f9067c1759032dc16adcd6df1fa7aab Mon Sep 17 00:00:00 2001 From: Luna Date: Fri, 22 Dec 2023 21:21:41 -0300 Subject: [PATCH] streaming decompression for less storage use --- build_database.py | 231 +++++++++++++++++++++++----------------------- 1 file changed, 113 insertions(+), 118 deletions(-) diff --git a/build_database.py b/build_database.py index 5917a7d..c91621e 100644 --- a/build_database.py +++ b/build_database.py @@ -61,7 +61,7 @@ async def main_with_ctx(ctx, wanted_date): } output_compressed_paths = {} - output_uncompressed_paths = {} + output_fd = {} for url_type, url in urls.items(): parsed = urlparse(url) @@ -69,10 +69,6 @@ async def main_with_ctx(ctx, wanted_date): output_path = Path.cwd() / parsed_path.name output_compressed_paths[url_type] = output_path - original_name, original_extension, _gz = parsed_path.name.split(".") - output_uncompressed = Path.cwd() / f"{original_name}.{original_extension}" - output_uncompressed_paths[url_type] = output_uncompressed - for url_type, url in urls.items(): output_path = output_compressed_paths[url_type] if output_path.exists(): @@ -108,16 +104,7 @@ async def main_with_ctx(ctx, wanted_date): # decompress for url_type, _url in urls.items(): input_path = output_compressed_paths[url_type] - output_path = output_uncompressed_paths[url_type] - - if output_path.exists(): - log.info("decompressed file %s already exists, ignoring", output_path) - continue - - log.info("decompressing %s into %s", input_path.name, output_path.name) - with gzip.open(input_path, "rb") as in_fd: - with output_path.open(mode="wb") as out_fd: - shutil.copyfileobj(in_fd, out_fd) + output_fd[url_type] = gzip.open(input_path, "rt") # now that everythings downloaded, compile the db await ctx.db.executescript( @@ -153,100 +140,113 @@ async def main_with_ctx(ctx, wanted_date): ) await ctx.db.commit() + work_done = False + + tags_csv_fd = output_fd["tags"] + work_done = work_done or await work_tags(ctx, tags_csv_fd) + + log.info("going to process posts") + + posts_csv_fd = output_fd["posts"] + any_posts = await work_posts(ctx, posts_csv_fd) + work_done = work_done or any_posts + + if work_done: + log.info("vacuuming db...") + await ctx.db.execute("vacuum") + log.info("database built") + + +async def work_tags(ctx, tags_csv_fd): tag_count_rows = await ctx.db.execute_fetchall("select count(*) from tags") tag_count = tag_count_rows[0][0] log.info("already have %d tags", tag_count) - work_done = False + line_count = 0 + for line in tags_csv_fd: + line_count += 1 - with output_uncompressed_paths["tags"].open( - mode="r", encoding="utf-8" - ) as tags_csv_fd: - line_count = 0 - for line in tags_csv_fd: - line_count += 1 + line_count -= 1 # remove header - line_count -= 1 # remove header + log.info("%d tags to import", line_count) - log.info("%d tags to import", line_count) + if line_count == tag_count: + log.info("same counts, not going to reimport") + return False + else: + tags_csv_fd.seek(0) + tags_reader = csv.reader(tags_csv_fd) - if line_count == tag_count: - log.info("same counts, not going to reimport") - else: - tags_csv_fd.seek(0) - tags_reader = csv.reader(tags_csv_fd) + assert len(next(tags_reader)) == 4 - assert len(next(tags_reader)) == 4 + processed_count = 0 + processed_ratio = 0 - processed_count = 0 - processed_ratio = 0 + for row in tags_reader: + tag = Tag(int(row[0]), row[1], int(row[2]), int(row[3])) + await ctx.db.execute( + "insert into tags (id, name, category, post_count) values (?, ?, ?, ?) on conflict do nothing", + (tag.id, tag.name, tag.category, tag.post_count), + ) + processed_count += 1 + new_processed_ratio = round((processed_count / line_count) * 100) + if new_processed_ratio != processed_ratio: + log.info("tags processed at %d%%", processed_ratio) + processed_ratio = new_processed_ratio - for row in tags_reader: - tag = Tag(int(row[0]), row[1], int(row[2]), int(row[3])) - await ctx.db.execute( - "insert into tags (id, name, category, post_count) values (?, ?, ?, ?) on conflict do nothing", - (tag.id, tag.name, tag.category, tag.post_count), - ) - processed_count += 1 - new_processed_ratio = round((processed_count / line_count) * 100) - if new_processed_ratio != processed_ratio: - log.info("tags processed at %d%%", processed_ratio) - processed_ratio = new_processed_ratio + log.info("tags commit...") + await ctx.db.commit() + log.info("tags done...") + return True - log.info("tags done") - work_done = True - await ctx.db.commit() - - log.info("going to process posts") +async def work_posts(ctx, posts_csv_fd): post_count_rows = await ctx.db.execute_fetchall("select count(*) from posts") post_count = post_count_rows[0][0] log.info("already have %d posts", post_count) - with output_uncompressed_paths["posts"].open( - mode="r", encoding="utf-8" - ) as posts_csv_fd: - line_count = 0 - counter_reader = csv.reader(posts_csv_fd) - for _row in counter_reader: - line_count += 1 - line_count -= 1 # remove header + line_count = 0 + counter_reader = csv.reader(posts_csv_fd) + for _row in counter_reader: + line_count += 1 + line_count -= 1 # remove header - log.info("%d posts to import", line_count) - if line_count == post_count: - log.info("already imported everything, skipping") - else: - posts_csv_fd.seek(0) - posts_reader = csv.DictReader(posts_csv_fd) + log.info("%d posts to import", line_count) + if line_count == post_count: + log.info("already imported everything, skipping") + return False + else: + posts_csv_fd.seek(0) + posts_reader = csv.DictReader(posts_csv_fd) - processed_count = 0 - processed_ratio = 0.0 + processed_count = 0 + processed_ratio = 0.0 - for row in posts_reader: - created_at_str = row["created_at"] - created_at = datetime.strptime( - created_at_str[: created_at_str.find(".")], "%Y-%m-%d %H:%M:%S" - ) + for row in posts_reader: + created_at_str = row["created_at"] + created_at = datetime.strptime( + created_at_str[: created_at_str.find(".")], "%Y-%m-%d %H:%M:%S" + ) - post = Post( - id=int(row["id"]), - uploader_id=int(row["uploader_id"]), - created_at=int(created_at.timestamp()), - md5=row["md5"], - source=row["source"], - rating=row["rating"], - tag_string=row["tag_string"], - is_deleted=e621_bool(row["is_deleted"]), - is_pending=e621_bool(row["is_pending"]), - is_flagged=e621_bool(row["is_flagged"]), - score=int(row["score"]), - up_score=int(row["up_score"]), - down_score=int(row["down_score"]), - is_rating_locked=e621_bool(row["is_rating_locked"]), - ) + post = Post( + id=int(row["id"]), + uploader_id=int(row["uploader_id"]), + created_at=int(created_at.timestamp()), + md5=row["md5"], + source=row["source"], + rating=row["rating"], + tag_string=row["tag_string"], + is_deleted=e621_bool(row["is_deleted"]), + is_pending=e621_bool(row["is_pending"]), + is_flagged=e621_bool(row["is_flagged"]), + score=int(row["score"]), + up_score=int(row["up_score"]), + down_score=int(row["down_score"]), + is_rating_locked=e621_bool(row["is_rating_locked"]), + ) - await ctx.db.execute( - """ + await ctx.db.execute( + """ insert into posts ( id, uploader_id, @@ -264,38 +264,33 @@ async def main_with_ctx(ctx, wanted_date): is_rating_locked ) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?) on conflict do nothing """, - ( - post.id, - post.uploader_id, - post.created_at, - post.md5, - post.source, - post.rating, - post.tag_string, - post.is_deleted, - post.is_pending, - post.is_flagged, - post.score, - post.up_score, - post.down_score, - post.is_rating_locked, - ), - ) - processed_count += 1 - new_processed_ratio = round((processed_count / line_count) * 100, 2) - if str(new_processed_ratio) != str(processed_ratio): - log.info("posts processed at %.2f%%", processed_ratio) - processed_ratio = new_processed_ratio + ( + post.id, + post.uploader_id, + post.created_at, + post.md5, + post.source, + post.rating, + post.tag_string, + post.is_deleted, + post.is_pending, + post.is_flagged, + post.score, + post.up_score, + post.down_score, + post.is_rating_locked, + ), + ) + processed_count += 1 + new_processed_ratio = round((processed_count / line_count) * 100, 2) + if str(new_processed_ratio) != str(processed_ratio): + log.info("posts processed at %.2f%%", processed_ratio) + processed_ratio = new_processed_ratio - log.info("posts done") - work_done = True - - await ctx.db.commit() - - if work_done: - log.info("vacuuming db...") - await ctx.db.execute("vacuum") - log.info("database built") + log.info("posts commit...") + await ctx.db.commit() + log.info("posts done") + return True async def main():