import sys import os import requests as RQ from datetime import datetime, timedelta from PyQt5.QtCore import QTimer, QThread, pyqtSignal, Qt, QObject from PyQt5.QtWidgets import ( QMainWindow, QApplication, QFileDialog, QProgressDialog, QTreeWidgetItem, QAction, QMessageBox, ) from urllib.request import Request, urlopen import gzip import pathlib from PyQt5.QtGui import QPalette, QColor import ed_lrr_gui import ed_lrr_gui.config as cfg from ed_lrr_gui.gui.ed_lrr import Ui_ED_LRR from ed_lrr_gui import Router, Preprocessor import multiprocessing as MP import queue import csv import _ed_lrr def sizeof_fmt(num, suffix="B"): for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: if abs(num) < 1024.0: return "{:.02f}{}{}".format(num, unit, suffix) num /= 1024.0 return "{:.02f}{}{}".format(num, "Yi", suffix) def t_round(dt): return dt - dt % timedelta(seconds=1) class ProgressDialog(QProgressDialog): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.setWindowModality(Qt.WindowModal) class Job(QObject): progress = pyqtSignal("PyQt_PyObject") def __init__(self, app, cls, *args, build_progress=None, **kwargs): super().__init__() self.job = cls(*args, **kwargs) self.timer = QTimer(app) self.app = app self.timer.timeout.connect(self.interval) self.timer.start(100) self.last_val = None self.build_progress = build_progress self.progress_dialog = None self.state = {} def start(self): """ if self.diag_prog is None: self.diag_prog = ProgressDialog("", "Cancel", 0, 1000, self.main_window) if self.dl_thread: self.diag_prog.canceled.connect(self.dl_canceled) self.diag_prog.show() t_elapsed = datetime.today() - self.dl_started rate = args["done"] / t_elapsed.total_seconds() remaining = (args["size"] - args["done"]) / rate rate = round(rate, 2) # print(rate, remaining) try: t_rem = timedelta(seconds=remaining) except OverflowError: t_rem = "-" msg = "Downloading {} [{}/{}] ({}/s)\n[{}/{}]".format( filename, sizeof_fmt(args["done"]), sizeof_fmt(args["size"]), sizeof_fmt(rate), t_round(t_elapsed), t_round(t_rem), ) self.diag_prog.setLabelText(msg) self.diag_prog.setWindowTitle("Downloading EDSM Dumps") self.diag_prog.setValue((args["done"] * 1000) // args["size"]) """ self.started = datetime.today() if self.build_progress: self.progress_dialog = ProgressDialog("", "Cancel", 0, 1000, self.app) self.progress_dialog.canceled.connect(self.cancel) self.progress_dialog.show() self.progress.connect(self.__build_progress) else: self.progress.connect( lambda *args, **kwargs: print("PROGRESS:", *args, **kwargs) ) return self.job.start() def __build_progress(self, *args, **kwargs): kwargs["self"] = self return self.build_progress(*args, **kwargs) def cancel(self): self.job.terminate() self.job = None def done(self): return (self.job.is_alive() == False) and (self.job.queue.empty()) def interval(self): while True: try: res = self.job.queue.get(True, 0.1) except queue.Empty: return if res == self.last_val: continue self.state.update(res) self.progress.emit(self.state) self.last_val = res class DownloadThread(QThread): progress = pyqtSignal("PyQt_PyObject") def __init__(self, systems_url, systems_file, bodies_url, bodies_file): super().__init__() self.systems_url = systems_url self.systems_file = systems_file self.bodies_url = bodies_url self.bodies_file = bodies_file self.running = True def __del__(self): self.wait() def stop(self): self.running = False def run(self): dl_jobs = [ (self.systems_url, self.systems_file), (self.bodies_url, self.bodies_file), ] for url, dest in dl_jobs: outfile = url.split("/")[-1] size = RQ.head(url, headers={"Accept-Encoding": "None"}) size.raise_for_status() size = int(size.headers.get("Content-Length", 0)) with open(dest, "wb") as of: resp = RQ.get(url, stream=True) for chunk in resp.iter_content(1024 * 1024): of.write(chunk) self.progress.emit( {"done": of.tell(), "size": size, "outfile": outfile} ) if not self.running: return class App(QApplication): def __init__(self): super().__init__(sys.argv) self.setStyle("Fusion") self.setup_styles() def set_style(self, style): print("LOAD:", style) self.setPalette(self.styles[style]) def setup_styles(self): self.styles = {} styles = { "Dark": { "Window": QColor(53, 53, 53), "WindowText": Qt.white, "Base": QColor(15, 15, 15), "AlternateBase": QColor(53, 53, 53), "ToolTipBase": Qt.white, "ToolTipText": Qt.white, "Text": Qt.white, "Button": QColor(53, 53, 53), "ButtonText": Qt.white, "BrightText": Qt.red, "Highlight": QColor(255, 128, 0), "HighlightedText": Qt.black, } } for style, colors in styles.items(): palette = QPalette() for entry, color in colors.items(): palette.setColor(getattr(QPalette, entry), color) if color == Qt.darkGray: palette.setColor( QPalette.Disabled, getattr(QPalette, entry), QColor(53, 53, 53) ) else: palette.setColor( QPalette.Disabled, getattr(QPalette, entry), Qt.darkGray ) self.styles[style] = palette self.styles["Light"] = self.style().standardPalette() class ED_LRR(Ui_ED_LRR): dl_thread = None diag_prog = None dl_started = None system_found = pyqtSignal("PyQt_PyObject") def __init__(self): super().__init__() self.config = cfg.load() self.jobs = {} def start_job(self, cls, *args, **kwargs): print("START JOB:", cls, args, kwargs) name = cls.__name__ if name in self.jobs and self.jobs[name].done(): del self.jobs[name] if not name in self.jobs: self.jobs[name] = Job(self.app, cls, *args, **kwargs) self.jobs[name].start() def get_open_file(self, filetypes, callback=None): fileName, _ = QFileDialog.getOpenFileName( self.main_window, "Open file", str(cfg.data_dir), filetypes, options=QFileDialog.DontUseNativeDialog, ) if callback: return callback(fileName) return fileName def get_save_file(self, filetypes, callback=None): fileName, _ = QFileDialog.getSaveFileName( self.main_window, "Save file", str(cfg.data_dir), filetypes, options=QFileDialog.DontUseNativeDialog, ) if callback: return callback(fileName) return fileName def set_sys_lst(self, path): if path not in self.config.history_out_path: self.config.history_out_path.append(path) self.inp_sys_lst.addItem(path) self.inp_out_pp.addItem(path) self.inp_sys_lst.setCurrentText(path) self.inp_out_pp.setCurrentText(path) def set_bodies_file(self, path): if path not in self.config.history_bodies_path: self.config.history_bodies_path.append(path) self.inp_bodies_pp.addItem(path) def set_systems_file(self, path): if path not in self.config.history_systems_path: self.config.history_systems_path.append(path) self.inp_systems_pp.addItem(path) def update_dropdowns(self): return def log(self, *args): t = datetime.today() msg_t = "[{}] {}".format(t, str.format(*args)) self.txt_log.append(msg_t) def set_comp_mode(self, _): if self.rd_comp.isChecked(): comp_mode = "Compute Route" self.btn_add.setText("Add") if self.rd_precomp.isChecked(): comp_mode = "Precompute Graph" self.btn_add.setText("Select") self.log("COMP_MODE", comp_mode) self.lst_sys.setEnabled(self.rd_comp.isChecked()) self.btn_rm.setEnabled(self.rd_comp.isChecked()) self.cmb_mode.setEnabled(self.rd_comp.isChecked()) self.chk_permute.setEnabled(self.rd_comp.isChecked()) self.lbl_keep.setEnabled(self.rd_comp.isChecked()) self.lbl_mode.setEnabled(self.rd_comp.isChecked()) self.chk_permute_keep_first.setEnabled(self.rd_comp.isChecked()) self.chk_permute_keep_last.setEnabled(self.rd_comp.isChecked()) self.set_route_mode(self.rd_precomp.isChecked() or None) def set_route_mode(self, mode=None): if mode == None: mode = self.cmb_mode.currentText() self.lbl_greedyness.setEnabled(mode == "A*-Search") self.sld_greedyness.setEnabled(mode == "A*-Search") def set_greedyness(self, value): self.lbl_greedyness.setText("Greedyness Factor ({:.0%})".format(value / 100)) @property def systems(self): ret = [] for n in range(self.lst_sys.topLevelItemCount()): ret.append(self.sys_to_dict(n)) return ret def sys_to_dict(self, n): header = [ self.lst_sys.headerItem().data(c, 0) for c in range(self.lst_sys.headerItem().columnCount()) ] system = [ self.lst_sys.topLevelItem(n).data(c, 0) for c in range(self.lst_sys.topLevelItem(n).columnCount()) ] ret = dict(zip(header, system)) ret["id"] = getattr(self.lst_sys.topLevelItem(n), "__id__", None) ret.pop(None, None) return ret def error(self, msg): QMessageBox.critical(self.main_window, "ERROR!", msg) def get_sys_list(self): if not self.inp_sys_lst.currentText(): self.error("System list is required!") return path = pathlib.Path(self.inp_sys_lst.currentText()) if not path.exists(): self.error("System list does not exist, run download and preprocess first!") return return path def run(self): if not all(s["Type"] for s in self.systems): self.error('Not all systens have been resolved, please click "Search All"') return print(self.systems) systems = [str(s["id"]) for s in self.systems] jump_range = self.sb_range.value() mode = self.cmb_mode.currentText() primary = self.chk_primary.isChecked() keep_first = self.chk_permute_keep_first.isChecked() keep_last = self.chk_permute_keep_last.isChecked() permute = self.chk_permute.isChecked() greedyness = ( self.sld_greedyness.value() / 100 if self.sld_greedyness.isEnabled() else None ) path = self.get_sys_list() if path is None: return precomp = path.with_suffix(".idx") path = str(path) if not precomp.exists(): precomp = None else: precomp = str(precomp) mode = { "Breadth-First Search": "bfs", "A*-Search": "astar", "Greedy-Search": "greedy", }[mode] print( systems, jump_range, mode, primary, permute, (keep_first, keep_last), greedyness, path, precomp, ) self.start_job( Router, systems, jump_range, 0.1, mode, primary, permute, keep_first, keep_last, greedyness, precomp, path, ) def find_sys_by_names(self, names): t_s = datetime.today() if not self.get_sys_list(): return None ret = _ed_lrr.find_sys(names, self.inp_sys_lst.currentText()) print("Took:", datetime.today() - t_s) return ret def resolve_systems(self): names = [] for n in range(self.lst_sys.topLevelItemCount()): names.append(self.sys_to_dict(n)["Name"]) systems = self.find_sys_by_names(names) if systems is None: return for i, name in enumerate(names): _, system = systems[name] self.lst_sys.topLevelItem(i).setData(0, 0, system["system"]) self.lst_sys.topLevelItem(i).setData(1, 0, system["star_type"]) self.lst_sys.topLevelItem(i).__id__ = system["id"] # diff, item = self.find_sys_by_name(name) # print("Found", (diff, item)) def add_system(self): name = self.inp_sys.text() item = QTreeWidgetItem(self.lst_sys, [name, None]) item.resolved = False item.setFlags(item.flags() & ~Qt.ItemIsDropEnabled) def remove_system(self): root = self.lst_sys.invisibleRootItem() for item in self.lst_sys.selectedItems(): root.removeChild(item) def dl_canceled(self): if self.dl_thread: print("Cancel!") try: self.dl_thread.progress.disconnect() except TypeError: pass self.dl_thread.stop() self.dl_thread.wait() self.diag_prog.close() self.dl_thread = None self.diag_prog = None self.dl_started = None def handle_dl_progress(self, args): filename = os.path.split(args["outfile"])[-1] if self.diag_prog is None: self.diag_prog = ProgressDialog("", "Cancel", 0, 1000, self.main_window) if self.dl_thread: self.diag_prog.canceled.connect(self.dl_canceled) self.diag_prog.show() t_elapsed = datetime.today() - self.dl_started rate = args["done"] / t_elapsed.total_seconds() remaining = (args["size"] - args["done"]) / rate rate = round(rate, 2) # print(rate, remaining) try: t_rem = timedelta(seconds=remaining) except OverflowError: t_rem = "-" msg = "Downloading {} [{}/{}] ({}/s)\n[{}/{}]".format( filename, sizeof_fmt(args["done"]), sizeof_fmt(args["size"]), sizeof_fmt(rate), t_round(t_elapsed), t_round(t_rem), ) self.diag_prog.setLabelText(msg) self.diag_prog.setWindowTitle("Downloading EDSM Dumps") self.diag_prog.setValue((args["done"] * 1000) // args["size"]) def run_download(self): if self.dl_thread: return self.dl_started = datetime.today() self.dl_thread = DownloadThread( self.inp_systems_dl.currentText(), self.inp_systems_dest_dl.currentText(), self.inp_bodies_dl.currentText(), self.inp_bodies_dest_dl.currentText(), ) self.dl_thread.progress.connect(self.handle_dl_progress) self.dl_thread.start() print(".") def update_permute_chk(self, state): self.chk_permute_keep_first.setEnabled(state) self.chk_permute_keep_last.setEnabled(state) self.lbl_keep.setEnabled(state) def setup_signals(self): self.btn_download.clicked.connect(self.run_download) self.inp_systems_dest_dl.setCurrentText(r"D:\devel\rust\ed_lrr_gui\DL\s.json") self.inp_bodies_dest_dl.setCurrentText(r"D:\devel\rust\ed_lrr_gui\DL\b.json") self.set_greedyness(self.sld_greedyness.value()) self.cmb_mode.currentTextChanged.connect(self.set_route_mode) self.rd_comp.toggled.connect(self.set_comp_mode) self.rd_precomp.toggled.connect(self.set_comp_mode) self.sld_greedyness.valueChanged.connect(self.set_greedyness) self.btn_go.clicked.connect(self.run) self.btn_add.clicked.connect(self.add_system) self.btn_rm.clicked.connect(self.remove_system) self.chk_permute.stateChanged.connect(self.update_permute_chk) self.btn_search.clicked.connect(self.resolve_systems) self.btn_out_browse_pp.clicked.connect( lambda: self.get_save_file("CSV File (*.csv)", self.set_sys_lst) ) self.btn_sys_lst_browse.clicked.connect( lambda: self.get_open_file("CSV File (*.csv)", self.set_sys_lst) ) self.btn_bodies_browse_pp.clicked.connect( lambda: self.get_open_file("JSON File (*.json)", self.set_bodies_file) ) self.btn_bodies_dest_browse_dl.clicked.connect( lambda: self.get_save_file("JSON File (*.json)", self.set_bodies_file) ) self.btn_systems_browse_pp.clicked.connect( lambda: self.get_open_file("JSON File (*.json)", self.set_systems_file) ) self.btn_systems_dest_browse_dl.clicked.connect( lambda: self.get_save_file("JSON File (*.json)", self.set_systems_file) ) def handle_close(self): cfg.write(self.config) print("BYEEEEEE!") def setup_styles(self, win, app): for name in app.styles: action = QAction(app) action.setObjectName("action_load_style_" + name) action.setText(name) action.triggered.connect(lambda _, name=name: app.set_style(name)) self.menuStyle.addAction(action) def setupUi(self, MainWindow, app): super().setupUi(MainWindow) self.update_dropdowns() self.main_window = MainWindow self.app = app self.setup_signals() self.lst_sys.setHeaderLabels(["Name", "Type"]) self.set_route_mode() self.update_permute_chk(self.chk_permute.isChecked()) self.setup_styles(MainWindow, app) def main(): app = App() MainWindow = QMainWindow() ui = ED_LRR() ui.setupUi(MainWindow, app) MainWindow.show() ret = app.exec_() ui.handle_close() exit(ret) if __name__ == "__main__": MP.freeze_support() main()