Compare commits
No commits in common. "930b48e55b1b010c423cfc018c1cead1e152c79c" and "77466bab7c704ee4a0829d8d13ff9bd2bc0a19d5" have entirely different histories.
930b48e55b
...
77466bab7c
2 changed files with 121 additions and 116 deletions
|
@ -61,7 +61,7 @@ async def main_with_ctx(ctx, wanted_date):
|
||||||
}
|
}
|
||||||
|
|
||||||
output_compressed_paths = {}
|
output_compressed_paths = {}
|
||||||
output_fd = {}
|
output_uncompressed_paths = {}
|
||||||
|
|
||||||
for url_type, url in urls.items():
|
for url_type, url in urls.items():
|
||||||
parsed = urlparse(url)
|
parsed = urlparse(url)
|
||||||
|
@ -69,6 +69,10 @@ async def main_with_ctx(ctx, wanted_date):
|
||||||
output_path = Path.cwd() / parsed_path.name
|
output_path = Path.cwd() / parsed_path.name
|
||||||
output_compressed_paths[url_type] = output_path
|
output_compressed_paths[url_type] = output_path
|
||||||
|
|
||||||
|
original_name, original_extension, _gz = parsed_path.name.split(".")
|
||||||
|
output_uncompressed = Path.cwd() / f"{original_name}.{original_extension}"
|
||||||
|
output_uncompressed_paths[url_type] = output_uncompressed
|
||||||
|
|
||||||
for url_type, url in urls.items():
|
for url_type, url in urls.items():
|
||||||
output_path = output_compressed_paths[url_type]
|
output_path = output_compressed_paths[url_type]
|
||||||
if output_path.exists():
|
if output_path.exists():
|
||||||
|
@ -104,7 +108,16 @@ async def main_with_ctx(ctx, wanted_date):
|
||||||
# decompress
|
# decompress
|
||||||
for url_type, _url in urls.items():
|
for url_type, _url in urls.items():
|
||||||
input_path = output_compressed_paths[url_type]
|
input_path = output_compressed_paths[url_type]
|
||||||
output_fd[url_type] = gzip.open(input_path, "rt")
|
output_path = output_uncompressed_paths[url_type]
|
||||||
|
|
||||||
|
if output_path.exists():
|
||||||
|
log.info("decompressed file %s already exists, ignoring", output_path)
|
||||||
|
continue
|
||||||
|
|
||||||
|
log.info("decompressing %s into %s", input_path.name, output_path.name)
|
||||||
|
with gzip.open(input_path, "rb") as in_fd:
|
||||||
|
with output_path.open(mode="wb") as out_fd:
|
||||||
|
shutil.copyfileobj(in_fd, out_fd)
|
||||||
|
|
||||||
# now that everythings downloaded, compile the db
|
# now that everythings downloaded, compile the db
|
||||||
await ctx.db.executescript(
|
await ctx.db.executescript(
|
||||||
|
@ -140,28 +153,15 @@ async def main_with_ctx(ctx, wanted_date):
|
||||||
)
|
)
|
||||||
await ctx.db.commit()
|
await ctx.db.commit()
|
||||||
|
|
||||||
work_done = False
|
|
||||||
|
|
||||||
tags_csv_fd = output_fd["tags"]
|
|
||||||
work_done = work_done or await work_tags(ctx, tags_csv_fd)
|
|
||||||
|
|
||||||
log.info("going to process posts")
|
|
||||||
|
|
||||||
posts_csv_fd = output_fd["posts"]
|
|
||||||
any_posts = await work_posts(ctx, posts_csv_fd)
|
|
||||||
work_done = work_done or any_posts
|
|
||||||
|
|
||||||
if work_done:
|
|
||||||
log.info("vacuuming db...")
|
|
||||||
await ctx.db.execute("vacuum")
|
|
||||||
log.info("database built")
|
|
||||||
|
|
||||||
|
|
||||||
async def work_tags(ctx, tags_csv_fd):
|
|
||||||
tag_count_rows = await ctx.db.execute_fetchall("select count(*) from tags")
|
tag_count_rows = await ctx.db.execute_fetchall("select count(*) from tags")
|
||||||
tag_count = tag_count_rows[0][0]
|
tag_count = tag_count_rows[0][0]
|
||||||
log.info("already have %d tags", tag_count)
|
log.info("already have %d tags", tag_count)
|
||||||
|
|
||||||
|
work_done = False
|
||||||
|
|
||||||
|
with output_uncompressed_paths["tags"].open(
|
||||||
|
mode="r", encoding="utf-8"
|
||||||
|
) as tags_csv_fd:
|
||||||
line_count = 0
|
line_count = 0
|
||||||
for line in tags_csv_fd:
|
for line in tags_csv_fd:
|
||||||
line_count += 1
|
line_count += 1
|
||||||
|
@ -172,7 +172,6 @@ async def work_tags(ctx, tags_csv_fd):
|
||||||
|
|
||||||
if line_count == tag_count:
|
if line_count == tag_count:
|
||||||
log.info("same counts, not going to reimport")
|
log.info("same counts, not going to reimport")
|
||||||
return False
|
|
||||||
else:
|
else:
|
||||||
tags_csv_fd.seek(0)
|
tags_csv_fd.seek(0)
|
||||||
tags_reader = csv.reader(tags_csv_fd)
|
tags_reader = csv.reader(tags_csv_fd)
|
||||||
|
@ -194,17 +193,19 @@ async def work_tags(ctx, tags_csv_fd):
|
||||||
log.info("tags processed at %d%%", processed_ratio)
|
log.info("tags processed at %d%%", processed_ratio)
|
||||||
processed_ratio = new_processed_ratio
|
processed_ratio = new_processed_ratio
|
||||||
|
|
||||||
log.info("tags commit...")
|
log.info("tags done")
|
||||||
|
work_done = True
|
||||||
|
|
||||||
await ctx.db.commit()
|
await ctx.db.commit()
|
||||||
log.info("tags done...")
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
log.info("going to process posts")
|
||||||
async def work_posts(ctx, posts_csv_fd):
|
|
||||||
post_count_rows = await ctx.db.execute_fetchall("select count(*) from posts")
|
post_count_rows = await ctx.db.execute_fetchall("select count(*) from posts")
|
||||||
post_count = post_count_rows[0][0]
|
post_count = post_count_rows[0][0]
|
||||||
log.info("already have %d posts", post_count)
|
log.info("already have %d posts", post_count)
|
||||||
|
|
||||||
|
with output_uncompressed_paths["posts"].open(
|
||||||
|
mode="r", encoding="utf-8"
|
||||||
|
) as posts_csv_fd:
|
||||||
line_count = 0
|
line_count = 0
|
||||||
counter_reader = csv.reader(posts_csv_fd)
|
counter_reader = csv.reader(posts_csv_fd)
|
||||||
for _row in counter_reader:
|
for _row in counter_reader:
|
||||||
|
@ -214,7 +215,6 @@ async def work_posts(ctx, posts_csv_fd):
|
||||||
log.info("%d posts to import", line_count)
|
log.info("%d posts to import", line_count)
|
||||||
if line_count == post_count:
|
if line_count == post_count:
|
||||||
log.info("already imported everything, skipping")
|
log.info("already imported everything, skipping")
|
||||||
return False
|
|
||||||
else:
|
else:
|
||||||
posts_csv_fd.seek(0)
|
posts_csv_fd.seek(0)
|
||||||
posts_reader = csv.DictReader(posts_csv_fd)
|
posts_reader = csv.DictReader(posts_csv_fd)
|
||||||
|
@ -287,10 +287,15 @@ async def work_posts(ctx, posts_csv_fd):
|
||||||
log.info("posts processed at %.2f%%", processed_ratio)
|
log.info("posts processed at %.2f%%", processed_ratio)
|
||||||
processed_ratio = new_processed_ratio
|
processed_ratio = new_processed_ratio
|
||||||
|
|
||||||
log.info("posts commit...")
|
|
||||||
await ctx.db.commit()
|
|
||||||
log.info("posts done")
|
log.info("posts done")
|
||||||
return True
|
work_done = True
|
||||||
|
|
||||||
|
await ctx.db.commit()
|
||||||
|
|
||||||
|
if work_done:
|
||||||
|
log.info("vacuuming db...")
|
||||||
|
await ctx.db.execute("vacuum")
|
||||||
|
log.info("database built")
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
|
|
|
@ -1,3 +1,3 @@
|
||||||
aiohttp==3.9.1
|
aiohttp==3.8.1
|
||||||
aiosqlite==0.19.0
|
aiosqlite==0.17.0
|
||||||
Quart==0.19.4
|
Quart==0.18.0
|
||||||
|
|
Loading…
Reference in a new issue