import pandas as pd import vaex as vx import json from PIL import Image, ImageDraw, ImageFont from skimage import exposure from skimage.io import imsave from skimage.util import img_as_ubyte import numpy as np from matplotlib import cm import subprocess as SP import os import sys import gc from datetime import timedelta import itertools as ITT from glob import glob base_size = 1080, 1920 steps = 1 framerate = 25 rh_fn = sys.argv[1] def scale_to(width=None, height=None): isnone = (width is None, height is None) ret = { (False, False): lambda w, h: (w, h), (True, True): lambda w, h: (width, height), (False, True): lambda w, h: (width, width * (h / w)), (True, False): lambda w, h: (height * (w / h), height), } return lambda *args: tuple(map(int, ret[isnone](*args))) # xz -1 1 bining = { ("zx", -1, 1): scale_to(width=base_size[0]), # main view, top down # ('yx',1,1): lambda size,w,h: (size,int(size*(w/h))), # # ('zy',-1,1): lambda size,w,h: (int(size*(h/w)),size), # } def apply_depth(stars, rh_fn): print("Loading", rh_fn, flush=True, end=" ") route_hist = pd.read_csv( rh_fn, names=["id", "depth"], index_col=0, dtype={"depth": int}, low_memory=False, ) print("OK") print("Converting to pandas dataframe", flush=True, end=" ") stars = stars.to_pandas_df() gc.collect() print("OK") print("Applying depth", flush=True, end=" ") stars["depth"] = float("nan") print("...",flush=True,end=" ") stars["depth"] = route_hist.depth + 1.0 print("OK") print("Converting to vaex dataframe", flush=True, end=" ") stars = vx.from_pandas(stars, copy_index=False) gc.collect() print("OK") return stars, route_hist.depth.max() """ #[derive(Debug, Clone, Serialize, Deserialize, IntoPyObject)] pub struct System { /// Unique System id pub id: u32, /// Star system pub name: String, /// Number of bodies pub num_bodies: u8, /// Does the system have a scoopable star? pub has_scoopable: bool, /// Jump range multiplier (1.5 for white dwarfs, 4.0 for neutron stars, 1.0 otherwise) pub mult: f32, /// Position pub pos: [f32; 3], } """ print("Loading stars.csv") stars = pd.read_csv( "stars.csv", names=["id", "name", "num_bodies", "has_scoopable", "mult", "x", "y", "z"], usecols=["id", "num_bodies", "x", "y", "z", "mult"], index_col=0, ) stars = vx.from_pandas(stars, copy_index=False) def render(stars, rh_fn): print("Rendering") json_file = os.path.splitext(rh_fn)[0] + ".json" if os.path.isfile(json_file): with open(json_file) as fh: route_info = json.load(fh) route_len = len(route_info["route"]) time_taken = str(timedelta(seconds=route_info["dt"])) route_rate = route_len / route_info["dt"] else: time_taken = "N/A" route_len = 0 route_rate = 0 route_info = {"dt": -1.0} stars, d_max = apply_depth(stars, rh_fn) basename = os.path.splitext(os.path.split(rh_fn)[-1])[0] filename = "img/{}.mkv".format(basename) if os.path.isfile(filename): return ffmpeg = SP.Popen( [ "ffmpeg", "-y", "-f", "image2pipe", "-probesize", "128M", "-i", "-", "-crf", "17", "-preset", "veryslow", "-r", str(framerate), "-pix_fmt", "yuv420p", filename, ], stdin=SP.PIPE, bufsize=0, ) total = stars.length() fnt = ImageFont.truetype(r"FiraCode-Regular", 40) for (binby_key, m1, m2), calcshape in bining.items(): binby = [m1 * stars[binby_key[0]], m2 * stars[binby_key[1]]] mm = [binby[0].minmax(), binby[1].minmax()] w, h = [mm[0][1] - mm[0][0], mm[1][1] - mm[1][0]] shape = calcshape(w, h) hm_all = stars.sum("num_bodies", binby=binby, shape=shape, limits="minmax") hm_all_mask = hm_all != 0 hm_all = exposure.equalize_hist(hm_all) hm_all -= hm_all.min() hm_all /= hm_all.max() hm_boost = stars.sum( "astype(mult>1.0,'int')", binby=binby, shape=shape, limits="minmax" ) hm_boost_mask = hm_boost != 0 hm_boost = exposure.equalize_hist(hm_boost) hm_boost -= hm_boost.min() hm_boost /= hm_boost.max() G = cm.Greens_r(hm_all) B = cm.Blues_r(hm_boost) hm_exp = stars.mean("depth", binby=binby, shape=shape, limits="minmax") hm_exp[np.isnan(hm_exp)] = 0.0 hm_exp -= hm_exp.min() hm_exp /= d_max R = cm.Reds_r(hm_exp) hm_exp_mask_base = hm_exp != 0.0 img = np.zeros((base_size[0], base_size[1], 4)) d_array = stars[~stars["depth"].isna()]["depth"].values exploration_rate = (d_array <= d_max).sum() / route_info["dt"] print("Total frames:",d_max) for d in range(0, d_max, steps): hm_exp_mask = np.logical_and(hm_exp_mask_base, hm_exp <= (d / d_max)) num_explored = (d_array <= d).sum() img[:, :, :] = 0.0 img[:, :, 3] = 1.0 canvas = img[: shape[0], : shape[1], :] canvas[hm_all_mask] = G[hm_all_mask] canvas[hm_boost_mask] = B[hm_boost_mask] canvas[hm_exp_mask] = R[hm_exp_mask] pil_img = Image.fromarray(img_as_ubyte(img)) draw = ImageDraw.Draw(pil_img) messages = [ "Filename: {}".format(basename), "Total Stars: {:,}".format(total), "Explored: {:,} ({:.2%})".format(num_explored, num_explored / total), "Search Depth: {:,}/{:,}".format(d, route_len), "Time: {}".format(time_taken), "Rate: {:.3f} waypoints/s".format(route_rate), "Exploration Rate: {:.3f} stars/s".format(exploration_rate), ] draw.multiline_text((shape[0], 0), "\n".join(messages), font=fnt) pil_img.save(ffmpeg.stdin, "bmp") ffmpeg.stdin.close() ffmpeg.wait() for rh_fn in ITT.chain.from_iterable(map(glob, sys.argv[1:])): render(stars, rh_fn)