streaming decompression for less storage use
This commit is contained in:
parent
77466bab7c
commit
b0c729789f
1 changed files with 113 additions and 118 deletions
|
@ -61,7 +61,7 @@ async def main_with_ctx(ctx, wanted_date):
|
|||
}
|
||||
|
||||
output_compressed_paths = {}
|
||||
output_uncompressed_paths = {}
|
||||
output_fd = {}
|
||||
|
||||
for url_type, url in urls.items():
|
||||
parsed = urlparse(url)
|
||||
|
@ -69,10 +69,6 @@ async def main_with_ctx(ctx, wanted_date):
|
|||
output_path = Path.cwd() / parsed_path.name
|
||||
output_compressed_paths[url_type] = output_path
|
||||
|
||||
original_name, original_extension, _gz = parsed_path.name.split(".")
|
||||
output_uncompressed = Path.cwd() / f"{original_name}.{original_extension}"
|
||||
output_uncompressed_paths[url_type] = output_uncompressed
|
||||
|
||||
for url_type, url in urls.items():
|
||||
output_path = output_compressed_paths[url_type]
|
||||
if output_path.exists():
|
||||
|
@ -108,16 +104,7 @@ async def main_with_ctx(ctx, wanted_date):
|
|||
# decompress
|
||||
for url_type, _url in urls.items():
|
||||
input_path = output_compressed_paths[url_type]
|
||||
output_path = output_uncompressed_paths[url_type]
|
||||
|
||||
if output_path.exists():
|
||||
log.info("decompressed file %s already exists, ignoring", output_path)
|
||||
continue
|
||||
|
||||
log.info("decompressing %s into %s", input_path.name, output_path.name)
|
||||
with gzip.open(input_path, "rb") as in_fd:
|
||||
with output_path.open(mode="wb") as out_fd:
|
||||
shutil.copyfileobj(in_fd, out_fd)
|
||||
output_fd[url_type] = gzip.open(input_path, "rt")
|
||||
|
||||
# now that everythings downloaded, compile the db
|
||||
await ctx.db.executescript(
|
||||
|
@ -153,100 +140,113 @@ async def main_with_ctx(ctx, wanted_date):
|
|||
)
|
||||
await ctx.db.commit()
|
||||
|
||||
work_done = False
|
||||
|
||||
tags_csv_fd = output_fd["tags"]
|
||||
work_done = work_done or await work_tags(ctx, tags_csv_fd)
|
||||
|
||||
log.info("going to process posts")
|
||||
|
||||
posts_csv_fd = output_fd["posts"]
|
||||
any_posts = await work_posts(ctx, posts_csv_fd)
|
||||
work_done = work_done or any_posts
|
||||
|
||||
if work_done:
|
||||
log.info("vacuuming db...")
|
||||
await ctx.db.execute("vacuum")
|
||||
log.info("database built")
|
||||
|
||||
|
||||
async def work_tags(ctx, tags_csv_fd):
|
||||
tag_count_rows = await ctx.db.execute_fetchall("select count(*) from tags")
|
||||
tag_count = tag_count_rows[0][0]
|
||||
log.info("already have %d tags", tag_count)
|
||||
|
||||
work_done = False
|
||||
line_count = 0
|
||||
for line in tags_csv_fd:
|
||||
line_count += 1
|
||||
|
||||
with output_uncompressed_paths["tags"].open(
|
||||
mode="r", encoding="utf-8"
|
||||
) as tags_csv_fd:
|
||||
line_count = 0
|
||||
for line in tags_csv_fd:
|
||||
line_count += 1
|
||||
line_count -= 1 # remove header
|
||||
|
||||
line_count -= 1 # remove header
|
||||
log.info("%d tags to import", line_count)
|
||||
|
||||
log.info("%d tags to import", line_count)
|
||||
if line_count == tag_count:
|
||||
log.info("same counts, not going to reimport")
|
||||
return False
|
||||
else:
|
||||
tags_csv_fd.seek(0)
|
||||
tags_reader = csv.reader(tags_csv_fd)
|
||||
|
||||
if line_count == tag_count:
|
||||
log.info("same counts, not going to reimport")
|
||||
else:
|
||||
tags_csv_fd.seek(0)
|
||||
tags_reader = csv.reader(tags_csv_fd)
|
||||
assert len(next(tags_reader)) == 4
|
||||
|
||||
assert len(next(tags_reader)) == 4
|
||||
processed_count = 0
|
||||
processed_ratio = 0
|
||||
|
||||
processed_count = 0
|
||||
processed_ratio = 0
|
||||
for row in tags_reader:
|
||||
tag = Tag(int(row[0]), row[1], int(row[2]), int(row[3]))
|
||||
await ctx.db.execute(
|
||||
"insert into tags (id, name, category, post_count) values (?, ?, ?, ?) on conflict do nothing",
|
||||
(tag.id, tag.name, tag.category, tag.post_count),
|
||||
)
|
||||
processed_count += 1
|
||||
new_processed_ratio = round((processed_count / line_count) * 100)
|
||||
if new_processed_ratio != processed_ratio:
|
||||
log.info("tags processed at %d%%", processed_ratio)
|
||||
processed_ratio = new_processed_ratio
|
||||
|
||||
for row in tags_reader:
|
||||
tag = Tag(int(row[0]), row[1], int(row[2]), int(row[3]))
|
||||
await ctx.db.execute(
|
||||
"insert into tags (id, name, category, post_count) values (?, ?, ?, ?) on conflict do nothing",
|
||||
(tag.id, tag.name, tag.category, tag.post_count),
|
||||
)
|
||||
processed_count += 1
|
||||
new_processed_ratio = round((processed_count / line_count) * 100)
|
||||
if new_processed_ratio != processed_ratio:
|
||||
log.info("tags processed at %d%%", processed_ratio)
|
||||
processed_ratio = new_processed_ratio
|
||||
log.info("tags commit...")
|
||||
await ctx.db.commit()
|
||||
log.info("tags done...")
|
||||
return True
|
||||
|
||||
log.info("tags done")
|
||||
work_done = True
|
||||
|
||||
await ctx.db.commit()
|
||||
|
||||
log.info("going to process posts")
|
||||
async def work_posts(ctx, posts_csv_fd):
|
||||
post_count_rows = await ctx.db.execute_fetchall("select count(*) from posts")
|
||||
post_count = post_count_rows[0][0]
|
||||
log.info("already have %d posts", post_count)
|
||||
|
||||
with output_uncompressed_paths["posts"].open(
|
||||
mode="r", encoding="utf-8"
|
||||
) as posts_csv_fd:
|
||||
line_count = 0
|
||||
counter_reader = csv.reader(posts_csv_fd)
|
||||
for _row in counter_reader:
|
||||
line_count += 1
|
||||
line_count -= 1 # remove header
|
||||
line_count = 0
|
||||
counter_reader = csv.reader(posts_csv_fd)
|
||||
for _row in counter_reader:
|
||||
line_count += 1
|
||||
line_count -= 1 # remove header
|
||||
|
||||
log.info("%d posts to import", line_count)
|
||||
if line_count == post_count:
|
||||
log.info("already imported everything, skipping")
|
||||
else:
|
||||
posts_csv_fd.seek(0)
|
||||
posts_reader = csv.DictReader(posts_csv_fd)
|
||||
log.info("%d posts to import", line_count)
|
||||
if line_count == post_count:
|
||||
log.info("already imported everything, skipping")
|
||||
return False
|
||||
else:
|
||||
posts_csv_fd.seek(0)
|
||||
posts_reader = csv.DictReader(posts_csv_fd)
|
||||
|
||||
processed_count = 0
|
||||
processed_ratio = 0.0
|
||||
processed_count = 0
|
||||
processed_ratio = 0.0
|
||||
|
||||
for row in posts_reader:
|
||||
created_at_str = row["created_at"]
|
||||
created_at = datetime.strptime(
|
||||
created_at_str[: created_at_str.find(".")], "%Y-%m-%d %H:%M:%S"
|
||||
)
|
||||
for row in posts_reader:
|
||||
created_at_str = row["created_at"]
|
||||
created_at = datetime.strptime(
|
||||
created_at_str[: created_at_str.find(".")], "%Y-%m-%d %H:%M:%S"
|
||||
)
|
||||
|
||||
post = Post(
|
||||
id=int(row["id"]),
|
||||
uploader_id=int(row["uploader_id"]),
|
||||
created_at=int(created_at.timestamp()),
|
||||
md5=row["md5"],
|
||||
source=row["source"],
|
||||
rating=row["rating"],
|
||||
tag_string=row["tag_string"],
|
||||
is_deleted=e621_bool(row["is_deleted"]),
|
||||
is_pending=e621_bool(row["is_pending"]),
|
||||
is_flagged=e621_bool(row["is_flagged"]),
|
||||
score=int(row["score"]),
|
||||
up_score=int(row["up_score"]),
|
||||
down_score=int(row["down_score"]),
|
||||
is_rating_locked=e621_bool(row["is_rating_locked"]),
|
||||
)
|
||||
post = Post(
|
||||
id=int(row["id"]),
|
||||
uploader_id=int(row["uploader_id"]),
|
||||
created_at=int(created_at.timestamp()),
|
||||
md5=row["md5"],
|
||||
source=row["source"],
|
||||
rating=row["rating"],
|
||||
tag_string=row["tag_string"],
|
||||
is_deleted=e621_bool(row["is_deleted"]),
|
||||
is_pending=e621_bool(row["is_pending"]),
|
||||
is_flagged=e621_bool(row["is_flagged"]),
|
||||
score=int(row["score"]),
|
||||
up_score=int(row["up_score"]),
|
||||
down_score=int(row["down_score"]),
|
||||
is_rating_locked=e621_bool(row["is_rating_locked"]),
|
||||
)
|
||||
|
||||
await ctx.db.execute(
|
||||
"""
|
||||
await ctx.db.execute(
|
||||
"""
|
||||
insert into posts (
|
||||
id,
|
||||
uploader_id,
|
||||
|
@ -264,38 +264,33 @@ async def main_with_ctx(ctx, wanted_date):
|
|||
is_rating_locked
|
||||
) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?) on conflict do nothing
|
||||
""",
|
||||
(
|
||||
post.id,
|
||||
post.uploader_id,
|
||||
post.created_at,
|
||||
post.md5,
|
||||
post.source,
|
||||
post.rating,
|
||||
post.tag_string,
|
||||
post.is_deleted,
|
||||
post.is_pending,
|
||||
post.is_flagged,
|
||||
post.score,
|
||||
post.up_score,
|
||||
post.down_score,
|
||||
post.is_rating_locked,
|
||||
),
|
||||
)
|
||||
processed_count += 1
|
||||
new_processed_ratio = round((processed_count / line_count) * 100, 2)
|
||||
if str(new_processed_ratio) != str(processed_ratio):
|
||||
log.info("posts processed at %.2f%%", processed_ratio)
|
||||
processed_ratio = new_processed_ratio
|
||||
(
|
||||
post.id,
|
||||
post.uploader_id,
|
||||
post.created_at,
|
||||
post.md5,
|
||||
post.source,
|
||||
post.rating,
|
||||
post.tag_string,
|
||||
post.is_deleted,
|
||||
post.is_pending,
|
||||
post.is_flagged,
|
||||
post.score,
|
||||
post.up_score,
|
||||
post.down_score,
|
||||
post.is_rating_locked,
|
||||
),
|
||||
)
|
||||
processed_count += 1
|
||||
new_processed_ratio = round((processed_count / line_count) * 100, 2)
|
||||
if str(new_processed_ratio) != str(processed_ratio):
|
||||
log.info("posts processed at %.2f%%", processed_ratio)
|
||||
processed_ratio = new_processed_ratio
|
||||
|
||||
log.info("posts done")
|
||||
work_done = True
|
||||
|
||||
await ctx.db.commit()
|
||||
|
||||
if work_done:
|
||||
log.info("vacuuming db...")
|
||||
await ctx.db.execute("vacuum")
|
||||
log.info("database built")
|
||||
log.info("posts commit...")
|
||||
await ctx.db.commit()
|
||||
log.info("posts done")
|
||||
return True
|
||||
|
||||
|
||||
async def main():
|
||||
|
|
Loading…
Reference in a new issue