Compare commits

...

2 commits

Author SHA1 Message Date
930b48e55b bump dependencies 2023-12-22 21:21:56 -03:00
b0c729789f streaming decompression for less storage use 2023-12-22 21:21:41 -03:00
2 changed files with 116 additions and 121 deletions

View file

@ -61,7 +61,7 @@ async def main_with_ctx(ctx, wanted_date):
}
output_compressed_paths = {}
output_uncompressed_paths = {}
output_fd = {}
for url_type, url in urls.items():
parsed = urlparse(url)
@ -69,10 +69,6 @@ async def main_with_ctx(ctx, wanted_date):
output_path = Path.cwd() / parsed_path.name
output_compressed_paths[url_type] = output_path
original_name, original_extension, _gz = parsed_path.name.split(".")
output_uncompressed = Path.cwd() / f"{original_name}.{original_extension}"
output_uncompressed_paths[url_type] = output_uncompressed
for url_type, url in urls.items():
output_path = output_compressed_paths[url_type]
if output_path.exists():
@ -108,16 +104,7 @@ async def main_with_ctx(ctx, wanted_date):
# decompress
for url_type, _url in urls.items():
input_path = output_compressed_paths[url_type]
output_path = output_uncompressed_paths[url_type]
if output_path.exists():
log.info("decompressed file %s already exists, ignoring", output_path)
continue
log.info("decompressing %s into %s", input_path.name, output_path.name)
with gzip.open(input_path, "rb") as in_fd:
with output_path.open(mode="wb") as out_fd:
shutil.copyfileobj(in_fd, out_fd)
output_fd[url_type] = gzip.open(input_path, "rt")
# now that everythings downloaded, compile the db
await ctx.db.executescript(
@ -153,15 +140,28 @@ async def main_with_ctx(ctx, wanted_date):
)
await ctx.db.commit()
work_done = False
tags_csv_fd = output_fd["tags"]
work_done = work_done or await work_tags(ctx, tags_csv_fd)
log.info("going to process posts")
posts_csv_fd = output_fd["posts"]
any_posts = await work_posts(ctx, posts_csv_fd)
work_done = work_done or any_posts
if work_done:
log.info("vacuuming db...")
await ctx.db.execute("vacuum")
log.info("database built")
async def work_tags(ctx, tags_csv_fd):
tag_count_rows = await ctx.db.execute_fetchall("select count(*) from tags")
tag_count = tag_count_rows[0][0]
log.info("already have %d tags", tag_count)
work_done = False
with output_uncompressed_paths["tags"].open(
mode="r", encoding="utf-8"
) as tags_csv_fd:
line_count = 0
for line in tags_csv_fd:
line_count += 1
@ -172,6 +172,7 @@ async def main_with_ctx(ctx, wanted_date):
if line_count == tag_count:
log.info("same counts, not going to reimport")
return False
else:
tags_csv_fd.seek(0)
tags_reader = csv.reader(tags_csv_fd)
@ -193,19 +194,17 @@ async def main_with_ctx(ctx, wanted_date):
log.info("tags processed at %d%%", processed_ratio)
processed_ratio = new_processed_ratio
log.info("tags done")
work_done = True
log.info("tags commit...")
await ctx.db.commit()
log.info("tags done...")
return True
log.info("going to process posts")
async def work_posts(ctx, posts_csv_fd):
post_count_rows = await ctx.db.execute_fetchall("select count(*) from posts")
post_count = post_count_rows[0][0]
log.info("already have %d posts", post_count)
with output_uncompressed_paths["posts"].open(
mode="r", encoding="utf-8"
) as posts_csv_fd:
line_count = 0
counter_reader = csv.reader(posts_csv_fd)
for _row in counter_reader:
@ -215,6 +214,7 @@ async def main_with_ctx(ctx, wanted_date):
log.info("%d posts to import", line_count)
if line_count == post_count:
log.info("already imported everything, skipping")
return False
else:
posts_csv_fd.seek(0)
posts_reader = csv.DictReader(posts_csv_fd)
@ -287,15 +287,10 @@ async def main_with_ctx(ctx, wanted_date):
log.info("posts processed at %.2f%%", processed_ratio)
processed_ratio = new_processed_ratio
log.info("posts done")
work_done = True
log.info("posts commit...")
await ctx.db.commit()
if work_done:
log.info("vacuuming db...")
await ctx.db.execute("vacuum")
log.info("database built")
log.info("posts done")
return True
async def main():

View file

@ -1,3 +1,3 @@
aiohttp==3.8.1
aiosqlite==0.17.0
Quart==0.18.0
aiohttp==3.9.1
aiosqlite==0.19.0
Quart==0.19.4