import csv import gzip import multiprocessing as MP import os import pathlib import queue import sys from sys import exit from datetime import datetime, timedelta from urllib.request import Request, urlopen import _ed_lrr import ed_lrr_gui import ed_lrr_gui.config as cfg import requests as RQ from ed_lrr_gui import Preprocessor, Router from ed_lrr_gui.gui.ed_lrr import Ui_ED_LRR from PyQt5.QtCore import QObject, Qt, QThread, QTimer, pyqtSignal from PyQt5.QtGui import QColor, QPalette,QIcon from PyQt5.QtWidgets import ( QAction, QApplication, QFileDialog, QMainWindow, QMessageBox, QProgressDialog, QTreeWidgetItem, ) def sizeof_fmt(num, suffix="B"): for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: if abs(num) < 1024.0: return "{:.02f}{}{}".format(num, unit, suffix) num /= 1024.0 return "{:.02f}{}{}".format(num, "Yi", suffix) def t_round(dt): return dt - dt % timedelta(seconds=1) class ProgressDialog(QProgressDialog): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.setWindowModality(Qt.WindowModal) class Job(QObject): progress = pyqtSignal("PyQt_PyObject") def __init__(self, app, main_window, cls, *args, **kwargs): super().__init__() self.job = cls(*args, **kwargs) self.timer = QTimer(app) self.app = app self.main_window = main_window self.timer.timeout.connect(self.interval) self.timer.start(100) self.last_val = None self.progress_dialog = None self.handle_progess = None self.state = {} def setup_progress(self, handle_progess): self.progress.connect( lambda *args, **kwargs: handle_progess(self, *args, **kwargs) ) def start(self): if self.progress_dialog is None: self.progress.connect( lambda *args, **kwargs: print("PROGRESS:", *args, **kwargs) ) self.started = datetime.today() return self.job.start() def cancel(self): self.job.terminate() self.job = None def done(self): return (self.job.is_alive() == False) and (self.job.queue.empty()) def interval(self): while True: try: res = self.job.queue.get(True, 0.1) except queue.Empty: return if res == self.last_val: continue self.state.update(res) self.progress.emit(self.state) self.last_val = res class DownloadThread(QThread): progress = pyqtSignal("PyQt_PyObject") def __init__(self, systems_url, systems_file, bodies_url, bodies_file): super().__init__() self.systems_url = systems_url self.systems_file = systems_file self.bodies_url = bodies_url self.bodies_file = bodies_file self.running = True def __del__(self): self.wait() def stop(self): self.running = False def run(self): dl_jobs = [ (self.systems_url, self.systems_file), (self.bodies_url, self.bodies_file), ] for url, dest in dl_jobs: outfile = url.split("/")[-1] size = RQ.head(url, headers={"Accept-Encoding": "None"}) size.raise_for_status() size = int(size.headers.get("Content-Length", 0)) with open(dest, "wb") as of: resp = RQ.get(url, stream=True) for chunk in resp.iter_content(1024 * 1024): of.write(chunk) self.progress.emit( {"done": of.tell(), "size": size, "outfile": outfile} ) if not self.running: return class App(QApplication): def __init__(self): super().__init__(sys.argv) self.setStyle("Fusion") self.setup_styles() def set_style(self, style): print("LOAD:", style) self.setPalette(self.styles[style]) def setup_styles(self): self.styles = {} styles = { "Dark": { "Window": QColor(53, 53, 53), "WindowText": Qt.white, "Base": QColor(15, 15, 15), "AlternateBase": QColor(53, 53, 53), "ToolTipBase": Qt.white, "ToolTipText": Qt.white, "Text": Qt.white, "Button": QColor(53, 53, 53), "ButtonText": Qt.white, "BrightText": Qt.red, "Highlight": QColor(255, 128, 0), "HighlightedText": Qt.black, } } for style, colors in styles.items(): palette = QPalette() for entry, color in colors.items(): palette.setColor(getattr(QPalette, entry), color) if color == Qt.darkGray: palette.setColor( QPalette.Disabled, getattr(QPalette, entry), QColor(53, 53, 53) ) else: palette.setColor( QPalette.Disabled, getattr(QPalette, entry), Qt.darkGray ) self.styles[style] = palette self.styles["Light"] = self.style().standardPalette() class ED_LRR(Ui_ED_LRR): dl_thread = None diag_prog = None dl_started = None system_found = pyqtSignal("PyQt_PyObject") def __init__(self): super().__init__() self.config = cfg.load() self.jobs = {} def new_job(self, cls, *args, **kwargs): print("CREATE JOB:", cls, args, kwargs) name = cls.__name__ if name in self.jobs and self.jobs[name].done(): del self.jobs[name] if not name in self.jobs: self.jobs[name] = Job(self.app, self.main_window, cls, *args, **kwargs) return self.jobs[name] def get_open_file(self, filetypes, callback=None): fileName, _ = QFileDialog.getOpenFileName( self.main_window, "Open file", str(cfg.data_dir), filetypes, options=QFileDialog.DontUseNativeDialog, ) if callback: return callback(fileName) return fileName def get_save_file(self, filetypes, callback=None): fileName, _ = QFileDialog.getSaveFileName( self.main_window, "Save file", str(cfg.data_dir), filetypes, options=QFileDialog.DontUseNativeDialog, ) if callback: return callback(fileName) return fileName def set_sys_lst(self, path): if path not in self.config.history_out_path: self.config.history_out_path.append(path) self.inp_sys_lst.addItem(path) self.inp_out_pp.addItem(path) self.inp_sys_lst.setCurrentText(path) self.inp_out_pp.setCurrentText(path) def set_bodies_file(self, path): if path not in self.config.history_bodies_path: self.config.history_bodies_path.append(path) self.inp_bodies_pp.addItem(path) def set_systems_file(self, path): if path not in self.config.history_systems_path: self.config.history_systems_path.append(path) self.inp_systems_pp.addItem(path) def update_dropdowns(self): return def log(self, *args): t = datetime.today() msg_t = "[{}] {}".format(t, str.format(*args)) self.txt_log.append(msg_t) def set_comp_mode(self, _): if self.rd_comp.isChecked(): comp_mode = "Compute Route" self.btn_add.setText("Add") if self.rd_precomp.isChecked(): comp_mode = "Precompute Graph" self.btn_add.setText("Select") self.log("COMP_MODE", comp_mode) self.lst_sys.setEnabled(self.rd_comp.isChecked()) self.btn_rm.setEnabled(self.rd_comp.isChecked()) self.cmb_mode.setEnabled(self.rd_comp.isChecked()) self.chk_permute.setEnabled(self.rd_comp.isChecked()) self.lbl_keep.setEnabled(self.rd_comp.isChecked()) self.lbl_mode.setEnabled(self.rd_comp.isChecked()) self.chk_permute_keep_first.setEnabled(self.rd_comp.isChecked()) self.chk_permute_keep_last.setEnabled(self.rd_comp.isChecked()) self.set_route_mode(self.rd_precomp.isChecked() or None) def set_route_mode(self, mode=None): if mode == None: mode = self.cmb_mode.currentText() self.lbl_greedyness.setEnabled(mode == "A*-Search") self.sld_greedyness.setEnabled(mode == "A*-Search") def set_greedyness(self, value): self.lbl_greedyness.setText("Greedyness Factor ({:.0%})".format(value / 100)) @property def systems(self): ret = [] for n in range(self.lst_sys.topLevelItemCount()): ret.append(self.sys_to_dict(n)) return ret def sys_to_dict(self, n): header = [ self.lst_sys.headerItem().data(c, 0) for c in range(self.lst_sys.headerItem().columnCount()) ] system = [ self.lst_sys.topLevelItem(n).data(c, 0) for c in range(self.lst_sys.topLevelItem(n).columnCount()) ] ret = dict(zip(header, system)) ret["id"] = getattr(self.lst_sys.topLevelItem(n), "__id__", None) ret.pop(None, None) return ret def error(self, msg): QMessageBox.critical(self.main_window, "ERROR!", msg) def get_sys_list(self): if not self.inp_sys_lst.currentText(): self.error("System list is required!") return path = pathlib.Path(self.inp_sys_lst.currentText()) if not path.exists(): self.error("System list does not exist, run download and preprocess first!") return return path def route_progress(self, job, state): print("RP:", job, state) def run(self): if not all(s["Type"] for s in self.systems): self.error('Not all systens have been resolved, please click "Search All"') return print(self.systems) systems = [str(s["id"]) for s in self.systems] jump_range = self.sb_range.value() mode = self.cmb_mode.currentText() primary = self.chk_primary.isChecked() keep_first = self.chk_permute_keep_first.isChecked() keep_last = self.chk_permute_keep_last.isChecked() permute = self.chk_permute.isChecked() greedyness = ( self.sld_greedyness.value() / 100 if self.sld_greedyness.isEnabled() else None ) path = self.get_sys_list() if path is None: return precomp = None path = str(path) mode = { "Breadth-First Search": "bfs", "A*-Search": "astar", "Greedy-Search": "greedy", }[mode] print( systems, jump_range, mode, primary, permute, (keep_first, keep_last), greedyness, path, precomp, ) route_job = self.new_job( Router, systems, jump_range, 0.1, mode, primary, permute, keep_first, keep_last, greedyness, precomp, path, ) if route_job: self.route_progress_dialog = ProgressDialog( "Computing route...", "Cancel", 0, 100, self.main_window ) self.route_progress_dialog.canceled.connect(route_job.cancel) route_job.start() self.route_progress_dialog.show() else: self.error("Another route job is already running!") def find_sys_by_names(self, names): t_s = datetime.today() if not self.get_sys_list(): return None # TODO: start thread/subprocess ret = _ed_lrr.find_sys(names, self.inp_sys_lst.currentText()) print("Took:", datetime.today() - t_s) return ret def resolve_systems(self): # TODO: show spinner names = [] for n in range(self.lst_sys.topLevelItemCount()): names.append(self.sys_to_dict(n)["Name"]) systems = self.find_sys_by_names(names) if systems is None: return for i, name in enumerate(names): _, system = systems[name] self.lst_sys.topLevelItem(i).setData(0, 0, system["system"]) self.lst_sys.topLevelItem(i).setData(1, 0, system["star_type"]) self.lst_sys.topLevelItem(i).__id__ = system["id"] # diff, item = self.find_sys_by_name(name) # print("Found", (diff, item)) def add_system(self): name = self.inp_sys.text() item = QTreeWidgetItem(self.lst_sys, [name, None]) item.resolved = False item.setFlags(item.flags() & ~Qt.ItemIsDropEnabled) def remove_system(self): root = self.lst_sys.invisibleRootItem() for item in self.lst_sys.selectedItems(): root.removeChild(item) def dl_canceled(self): if self.dl_thread: print("Cancel!") try: self.dl_thread.progress.disconnect() except TypeError: pass self.dl_thread.stop() self.dl_thread.wait() self.diag_prog.close() self.dl_thread = None self.diag_prog = None self.dl_started = None def handle_dl_progress(self, args): filename = os.path.split(args["outfile"])[-1] if self.diag_prog is None: self.diag_prog = ProgressDialog("", "Cancel", 0, 1000, self.main_window) if self.dl_thread: self.diag_prog.canceled.connect(self.dl_canceled) self.diag_prog.show() t_elapsed = datetime.today() - self.dl_started rate = args["done"] / t_elapsed.total_seconds() remaining = (args["size"] - args["done"]) / rate rate = round(rate, 2) # print(rate, remaining) try: t_rem = timedelta(seconds=remaining) except OverflowError: t_rem = "-" msg = "Downloading {} [{}/{}] ({}/s)\n[{}/{}]".format( filename, sizeof_fmt(args["done"]), sizeof_fmt(args["size"]), sizeof_fmt(rate), t_round(t_elapsed), t_round(t_rem), ) self.diag_prog.setLabelText(msg) self.diag_prog.setWindowTitle("Downloading EDSM Dumps") self.diag_prog.setValue((args["done"] * 1000) // args["size"]) def run_download(self): if self.dl_thread: return self.dl_started = datetime.today() self.dl_thread = DownloadThread( self.inp_systems_dl.currentText(), self.inp_systems_dest_dl.currentText(), self.inp_bodies_dl.currentText(), self.inp_bodies_dest_dl.currentText(), ) self.dl_thread.progress.connect(self.handle_dl_progress) self.dl_thread.start() print(".") def update_permute_chk(self, state): self.chk_permute_keep_first.setEnabled(state) self.chk_permute_keep_last.setEnabled(state) self.lbl_keep.setEnabled(state) def setup_signals(self): self.btn_download.clicked.connect(self.run_download) self.inp_systems_dest_dl.setCurrentText(r"D:\devel\rust\ed_lrr_gui\DL\s.json") self.inp_bodies_dest_dl.setCurrentText(r"D:\devel\rust\ed_lrr_gui\DL\b.json") self.set_greedyness(self.sld_greedyness.value()) self.cmb_mode.currentTextChanged.connect(self.set_route_mode) self.rd_comp.toggled.connect(self.set_comp_mode) self.rd_precomp.toggled.connect(self.set_comp_mode) self.sld_greedyness.valueChanged.connect(self.set_greedyness) self.btn_go.clicked.connect(self.run) self.btn_add.clicked.connect(self.add_system) self.btn_rm.clicked.connect(self.remove_system) self.chk_permute.stateChanged.connect(self.update_permute_chk) self.btn_search.clicked.connect(self.resolve_systems) self.btn_out_browse_pp.clicked.connect( lambda: self.get_save_file("CSV File (*.csv)", self.set_sys_lst) ) self.btn_sys_lst_browse.clicked.connect( lambda: self.get_open_file("CSV File (*.csv)", self.set_sys_lst) ) self.btn_bodies_browse_pp.clicked.connect( lambda: self.get_open_file("JSON File (*.json)", self.set_bodies_file) ) self.btn_bodies_dest_browse_dl.clicked.connect( lambda: self.get_save_file("JSON File (*.json)", self.set_bodies_file) ) self.btn_systems_browse_pp.clicked.connect( lambda: self.get_open_file("JSON File (*.json)", self.set_systems_file) ) self.btn_systems_dest_browse_dl.clicked.connect( lambda: self.get_save_file("JSON File (*.json)", self.set_systems_file) ) def handle_close(self): cfg.write(self.config) print("BYEEEEEE!") def setup_styles(self, win, app): for name in app.styles: action = QAction(app) action.setObjectName("action_load_style_" + name) action.setText(name) action.triggered.connect(lambda _, name=name: app.set_style(name)) self.menuStyle.addAction(action) def setupUi(self, MainWindow, app): super().setupUi(MainWindow) self.update_dropdowns() self.main_window = MainWindow self.app = app self.setup_signals() self.lst_sys.setHeaderLabels(["Name", "Type"]) self.set_route_mode() self.update_permute_chk(self.chk_permute.isChecked()) self.setup_styles(MainWindow, app) def main(): MP.freeze_support() app = App() app.setWindowIcon(QIcon(r'D:\devel\rust\ed_lrr_gui\icon\icon.ico')) MainWindow = QMainWindow() MainWindow.setWindowIcon(QIcon(r'D:\devel\rust\ed_lrr_gui\icon\icon.ico')) ui = ED_LRR() ui.setupUi(MainWindow, app) MainWindow.show() ret = app.exec_() ui.handle_close() exit(ret) if __name__ == "__main__": main()