Daniel Seiller
c290d5eb12
Implement route plotting in GUI Use batch fuzzy search to find systems search all systems at once after adding added some error checking
562 lines
19 KiB
Python
562 lines
19 KiB
Python
import sys
|
|
import os
|
|
import requests as RQ
|
|
from datetime import datetime, timedelta
|
|
from PyQt5.QtCore import QTimer, QThread, pyqtSignal, Qt, QObject
|
|
from PyQt5.QtWidgets import (
|
|
QMainWindow,
|
|
QApplication,
|
|
QFileDialog,
|
|
QProgressDialog,
|
|
QTreeWidgetItem,
|
|
QAction,
|
|
QMessageBox,
|
|
)
|
|
from urllib.request import Request, urlopen
|
|
import gzip
|
|
import pathlib
|
|
from PyQt5.QtGui import QPalette, QColor
|
|
import ed_lrr_gui
|
|
import ed_lrr_gui.config as cfg
|
|
from ed_lrr_gui.gui.ed_lrr import Ui_ED_LRR
|
|
from ed_lrr_gui import Router, Preprocessor
|
|
import multiprocessing as MP
|
|
import queue
|
|
import csv
|
|
import _ed_lrr
|
|
|
|
|
|
def sizeof_fmt(num, suffix="B"):
|
|
for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]:
|
|
if abs(num) < 1024.0:
|
|
return "{:.02f}{}{}".format(num, unit, suffix)
|
|
num /= 1024.0
|
|
return "{:.02f}{}{}".format(num, "Yi", suffix)
|
|
|
|
|
|
def t_round(dt):
|
|
return dt - dt % timedelta(seconds=1)
|
|
|
|
|
|
class ProgressDialog(QProgressDialog):
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.setWindowModality(Qt.WindowModal)
|
|
|
|
|
|
class Job(QObject):
|
|
progress = pyqtSignal("PyQt_PyObject")
|
|
|
|
def __init__(self, app, cls, *args, build_progress=None, **kwargs):
|
|
super().__init__()
|
|
self.job = cls(*args, **kwargs)
|
|
self.timer = QTimer(app)
|
|
self.app = app
|
|
self.timer.timeout.connect(self.interval)
|
|
self.timer.start(100)
|
|
self.last_val = None
|
|
self.build_progress = build_progress
|
|
self.progress_dialog = None
|
|
self.state = {}
|
|
|
|
def start(self):
|
|
"""
|
|
if self.diag_prog is None:
|
|
self.diag_prog = ProgressDialog("", "Cancel", 0, 1000, self.main_window)
|
|
if self.dl_thread:
|
|
self.diag_prog.canceled.connect(self.dl_canceled)
|
|
self.diag_prog.show()
|
|
t_elapsed = datetime.today() - self.dl_started
|
|
rate = args["done"] / t_elapsed.total_seconds()
|
|
remaining = (args["size"] - args["done"]) / rate
|
|
rate = round(rate, 2)
|
|
# print(rate, remaining)
|
|
try:
|
|
t_rem = timedelta(seconds=remaining)
|
|
except OverflowError:
|
|
t_rem = "-"
|
|
msg = "Downloading {} [{}/{}] ({}/s)\n[{}/{}]".format(
|
|
filename,
|
|
sizeof_fmt(args["done"]),
|
|
sizeof_fmt(args["size"]),
|
|
sizeof_fmt(rate),
|
|
t_round(t_elapsed),
|
|
t_round(t_rem),
|
|
)
|
|
self.diag_prog.setLabelText(msg)
|
|
self.diag_prog.setWindowTitle("Downloading EDSM Dumps")
|
|
self.diag_prog.setValue((args["done"] * 1000) // args["size"])
|
|
"""
|
|
self.started = datetime.today()
|
|
if self.build_progress:
|
|
self.progress_dialog = ProgressDialog("", "Cancel", 0, 1000, self.app)
|
|
self.progress_dialog.canceled.connect(self.cancel)
|
|
self.progress_dialog.show()
|
|
self.progress.connect(self.__build_progress)
|
|
else:
|
|
self.progress.connect(
|
|
lambda *args, **kwargs: print("PROGRESS:", *args, **kwargs)
|
|
)
|
|
return self.job.start()
|
|
|
|
def __build_progress(self, *args, **kwargs):
|
|
kwargs["self"] = self
|
|
return self.build_progress(*args, **kwargs)
|
|
|
|
def cancel(self):
|
|
self.job.terminate()
|
|
self.job = None
|
|
|
|
def done(self):
|
|
return (self.job.is_alive() == False) and (self.job.queue.empty())
|
|
|
|
def interval(self):
|
|
while True:
|
|
try:
|
|
res = self.job.queue.get(True, 0.1)
|
|
except queue.Empty:
|
|
return
|
|
if res == self.last_val:
|
|
continue
|
|
self.state.update(res)
|
|
self.progress.emit(self.state)
|
|
self.last_val = res
|
|
|
|
|
|
class DownloadThread(QThread):
|
|
progress = pyqtSignal("PyQt_PyObject")
|
|
|
|
def __init__(self, systems_url, systems_file, bodies_url, bodies_file):
|
|
super().__init__()
|
|
self.systems_url = systems_url
|
|
self.systems_file = systems_file
|
|
self.bodies_url = bodies_url
|
|
self.bodies_file = bodies_file
|
|
self.running = True
|
|
|
|
def __del__(self):
|
|
self.wait()
|
|
|
|
def stop(self):
|
|
self.running = False
|
|
|
|
def run(self):
|
|
jobs = [
|
|
(self.systems_url, self.systems_file),
|
|
(self.bodies_url, self.bodies_file),
|
|
]
|
|
for url, dest in jobs:
|
|
outfile = url.split("/")[-1]
|
|
size = RQ.head(url, headers={"Accept-Encoding": "None"})
|
|
size.raise_for_status()
|
|
size = int(size.headers.get("Content-Length", 0))
|
|
with open(dest, "wb") as of:
|
|
resp = RQ.get(url, stream=True)
|
|
for chunk in resp.iter_content(1024 * 1024):
|
|
of.write(chunk)
|
|
self.progress.emit(
|
|
{"done": of.tell(), "size": size, "outfile": outfile}
|
|
)
|
|
if not self.running:
|
|
return
|
|
|
|
|
|
class App(QApplication):
|
|
def __init__(self):
|
|
super().__init__(sys.argv)
|
|
self.setStyle("Fusion")
|
|
self.setup_styles()
|
|
|
|
def set_style(self, style):
|
|
print("LOAD:", style)
|
|
self.setPalette(self.styles[style])
|
|
|
|
def setup_styles(self):
|
|
self.styles = {}
|
|
styles = {
|
|
"Dark": {
|
|
"Window": QColor(53, 53, 53),
|
|
"WindowText": Qt.white,
|
|
"Base": QColor(15, 15, 15),
|
|
"AlternateBase": QColor(53, 53, 53),
|
|
"ToolTipBase": Qt.white,
|
|
"ToolTipText": Qt.white,
|
|
"Text": Qt.white,
|
|
"Button": QColor(53, 53, 53),
|
|
"ButtonText": Qt.white,
|
|
"BrightText": Qt.red,
|
|
"Highlight": QColor(255, 128, 0),
|
|
"HighlightedText": Qt.black,
|
|
}
|
|
}
|
|
for style, colors in styles.items():
|
|
palette = QPalette()
|
|
for entry, color in colors.items():
|
|
palette.setColor(getattr(QPalette, entry), color)
|
|
if color == Qt.darkGray:
|
|
palette.setColor(
|
|
QPalette.Disabled, getattr(QPalette, entry), QColor(53, 53, 53)
|
|
)
|
|
else:
|
|
palette.setColor(
|
|
QPalette.Disabled, getattr(QPalette, entry), Qt.darkGray
|
|
)
|
|
self.styles[style] = palette
|
|
self.styles["Light"] = self.style().standardPalette()
|
|
|
|
|
|
class ED_LRR(Ui_ED_LRR):
|
|
dl_thread = None
|
|
diag_prog = None
|
|
dl_started = None
|
|
system_found = pyqtSignal("PyQt_PyObject")
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.config = cfg.load()
|
|
self.jobs = {}
|
|
|
|
def start_job(self, cls, *args, **kwargs):
|
|
print("START JOB:", cls, args, kwargs)
|
|
name = cls.__name__
|
|
if name in self.jobs and self.jobs[name].done():
|
|
del self.jobs[name]
|
|
if not name in self.jobs:
|
|
self.jobs[name] = Job(self.app, cls, *args, **kwargs)
|
|
self.jobs[name].start()
|
|
|
|
def get_open_file(self, filetypes, callback=None):
|
|
fileName, _ = QFileDialog.getOpenFileName(
|
|
self.main_window,
|
|
"Open file",
|
|
str(cfg.data_dir),
|
|
filetypes,
|
|
options=QFileDialog.DontUseNativeDialog,
|
|
)
|
|
if callback:
|
|
return callback(fileName)
|
|
return fileName
|
|
|
|
def get_save_file(self, filetypes, callback=None):
|
|
fileName, _ = QFileDialog.getSaveFileName(
|
|
self.main_window,
|
|
"Save file",
|
|
str(cfg.data_dir),
|
|
filetypes,
|
|
options=QFileDialog.DontUseNativeDialog,
|
|
)
|
|
if callback:
|
|
return callback(fileName)
|
|
return fileName
|
|
|
|
def set_sys_lst(self, path):
|
|
if path not in self.config.history_out_path:
|
|
self.config.history_out_path.append(path)
|
|
self.inp_sys_lst.addItem(path)
|
|
self.inp_out_pp.addItem(path)
|
|
self.inp_sys_lst.setCurrentText(path)
|
|
self.inp_out_pp.setCurrentText(path)
|
|
|
|
def set_bodies_file(self, path):
|
|
if path not in self.config.history_bodies_path:
|
|
self.config.history_bodies_path.append(path)
|
|
self.inp_bodies_pp.addItem(path)
|
|
|
|
def set_systems_file(self, path):
|
|
if path not in self.config.history_systems_path:
|
|
self.config.history_systems_path.append(path)
|
|
self.inp_systems_pp.addItem(path)
|
|
|
|
def update_dropdowns(self):
|
|
return
|
|
|
|
def log(self, *args):
|
|
t = datetime.today()
|
|
msg_t = "[{}] {}".format(t, str.format(*args))
|
|
self.txt_log.append(msg_t)
|
|
|
|
def set_comp_mode(self, _):
|
|
if self.rd_comp.isChecked():
|
|
comp_mode = "Compute Route"
|
|
self.btn_add.setText("Add")
|
|
if self.rd_precomp.isChecked():
|
|
comp_mode = "Precompute Graph"
|
|
self.btn_add.setText("Select")
|
|
self.log("COMP_MODE", comp_mode)
|
|
self.lst_sys.setEnabled(self.rd_comp.isChecked())
|
|
self.btn_rm.setEnabled(self.rd_comp.isChecked())
|
|
self.cmb_mode.setEnabled(self.rd_comp.isChecked())
|
|
self.chk_permute.setEnabled(self.rd_comp.isChecked())
|
|
self.lbl_keep.setEnabled(self.rd_comp.isChecked())
|
|
self.lbl_mode.setEnabled(self.rd_comp.isChecked())
|
|
self.chk_permute_keep_first.setEnabled(self.rd_comp.isChecked())
|
|
self.chk_permute_keep_last.setEnabled(self.rd_comp.isChecked())
|
|
self.set_route_mode(self.rd_precomp.isChecked() or None)
|
|
|
|
def set_route_mode(self, mode=None):
|
|
if mode == None:
|
|
mode = self.cmb_mode.currentText()
|
|
self.lbl_greedyness.setEnabled(mode == "A*-Search")
|
|
self.sld_greedyness.setEnabled(mode == "A*-Search")
|
|
|
|
def set_greedyness(self, value):
|
|
self.lbl_greedyness.setText("Greedyness Factor ({:.0%})".format(value / 100))
|
|
|
|
@property
|
|
def systems(self):
|
|
ret = []
|
|
for n in range(self.lst_sys.topLevelItemCount()):
|
|
ret.append(self.sys_to_dict(n))
|
|
return ret
|
|
|
|
def sys_to_dict(self, n):
|
|
header = [
|
|
self.lst_sys.headerItem().data(c, 0)
|
|
for c in range(self.lst_sys.headerItem().columnCount())
|
|
]
|
|
system = [
|
|
self.lst_sys.topLevelItem(n).data(c, 0)
|
|
for c in range(self.lst_sys.topLevelItem(n).columnCount())
|
|
]
|
|
ret = dict(zip(header, system))
|
|
ret["id"] = getattr(self.lst_sys.topLevelItem(n), "__id__", None)
|
|
ret.pop(None, None)
|
|
return ret
|
|
|
|
def error(self, msg):
|
|
QMessageBox.critical(self.main_window, "ERROR!", msg)
|
|
|
|
def get_sys_list(self):
|
|
if not self.inp_sys_lst.currentText():
|
|
self.error("System list is required!")
|
|
return
|
|
path = pathlib.Path(self.inp_sys_lst.currentText())
|
|
if not path.exists():
|
|
self.error("System list does not exist, run download and preprocess first!")
|
|
return
|
|
return path
|
|
|
|
def run(self):
|
|
if not all(s["Type"] for s in self.systems):
|
|
self.error('Not all systens have been resolved, please click "Search All"')
|
|
return
|
|
print(self.systems)
|
|
systems = [str(s["id"]) for s in self.systems]
|
|
jump_range = self.sb_range.value()
|
|
mode = self.cmb_mode.currentText()
|
|
primary = self.chk_primary.isChecked()
|
|
keep_first = self.chk_permute_keep_first.isChecked()
|
|
keep_last = self.chk_permute_keep_last.isChecked()
|
|
permute = self.chk_permute.isChecked()
|
|
greedyness = (
|
|
self.sld_greedyness.value() / 100
|
|
if self.sld_greedyness.isEnabled()
|
|
else None
|
|
)
|
|
path = self.get_sys_list()
|
|
if path is None:
|
|
return
|
|
precomp = path.with_suffix(".idx")
|
|
path = str(path)
|
|
if not precomp.exists():
|
|
precomp = None
|
|
else:
|
|
precomp = str(precomp)
|
|
mode = {
|
|
"Breadth-First Search": "bfs",
|
|
"A*-Search": "astar",
|
|
"Greedy-Search": "greedy",
|
|
}[mode]
|
|
print(
|
|
systems,
|
|
jump_range,
|
|
mode,
|
|
primary,
|
|
permute,
|
|
(keep_first, keep_last),
|
|
greedyness,
|
|
path,
|
|
precomp,
|
|
)
|
|
self.start_job(
|
|
Router,
|
|
systems,
|
|
jump_range,
|
|
0.1,
|
|
mode,
|
|
primary,
|
|
permute,
|
|
keep_first,
|
|
keep_last,
|
|
greedyness,
|
|
precomp,
|
|
path,
|
|
)
|
|
|
|
def find_sys_by_names(self, names):
|
|
t_s = datetime.today()
|
|
if not self.get_sys_list():
|
|
return None
|
|
ret = _ed_lrr.find_sys(names, self.inp_sys_lst.currentText())
|
|
print("Took:", datetime.today() - t_s)
|
|
return ret
|
|
|
|
def resolve_systems(self):
|
|
names = []
|
|
for n in range(self.lst_sys.topLevelItemCount()):
|
|
names.append(self.sys_to_dict(n)["Name"])
|
|
systems = self.find_sys_by_names(names)
|
|
if systems is None:
|
|
return
|
|
for i, name in enumerate(names):
|
|
_, system = systems[name]
|
|
self.lst_sys.topLevelItem(i).setData(0, 0, system["system"])
|
|
self.lst_sys.topLevelItem(i).setData(1, 0, system["star_type"])
|
|
self.lst_sys.topLevelItem(i).__id__ = system["id"]
|
|
# diff, item = self.find_sys_by_name(name)
|
|
# print("Found", (diff, item))
|
|
|
|
def add_system(self):
|
|
name = self.inp_sys.text()
|
|
item = QTreeWidgetItem(self.lst_sys, [name, None])
|
|
item.resolved = False
|
|
item.setFlags(item.flags() & ~Qt.ItemIsDropEnabled)
|
|
|
|
def remove_system(self):
|
|
root = self.lst_sys.invisibleRootItem()
|
|
for item in self.lst_sys.selectedItems():
|
|
root.removeChild(item)
|
|
|
|
def dl_canceled(self):
|
|
if self.dl_thread:
|
|
print("Cancel!")
|
|
try:
|
|
self.dl_thread.progress.disconnect()
|
|
except TypeError:
|
|
pass
|
|
self.dl_thread.stop()
|
|
self.dl_thread.wait()
|
|
self.diag_prog.close()
|
|
self.dl_thread = None
|
|
self.diag_prog = None
|
|
self.dl_started = None
|
|
|
|
def handle_dl_progress(self, args):
|
|
filename = os.path.split(args["outfile"])[-1]
|
|
if self.diag_prog is None:
|
|
self.diag_prog = ProgressDialog("", "Cancel", 0, 1000, self.main_window)
|
|
if self.dl_thread:
|
|
self.diag_prog.canceled.connect(self.dl_canceled)
|
|
self.diag_prog.show()
|
|
t_elapsed = datetime.today() - self.dl_started
|
|
rate = args["done"] / t_elapsed.total_seconds()
|
|
remaining = (args["size"] - args["done"]) / rate
|
|
rate = round(rate, 2)
|
|
# print(rate, remaining)
|
|
try:
|
|
t_rem = timedelta(seconds=remaining)
|
|
except OverflowError:
|
|
t_rem = "-"
|
|
msg = "Downloading {} [{}/{}] ({}/s)\n[{}/{}]".format(
|
|
filename,
|
|
sizeof_fmt(args["done"]),
|
|
sizeof_fmt(args["size"]),
|
|
sizeof_fmt(rate),
|
|
t_round(t_elapsed),
|
|
t_round(t_rem),
|
|
)
|
|
self.diag_prog.setLabelText(msg)
|
|
self.diag_prog.setWindowTitle("Downloading EDSM Dumps")
|
|
self.diag_prog.setValue((args["done"] * 1000) // args["size"])
|
|
|
|
def run_download(self):
|
|
if self.dl_thread:
|
|
return
|
|
self.dl_started = datetime.today()
|
|
self.dl_thread = DownloadThread(
|
|
self.inp_systems_dl.currentText(),
|
|
self.inp_systems_dest_dl.currentText(),
|
|
self.inp_bodies_dl.currentText(),
|
|
self.inp_bodies_dest_dl.currentText(),
|
|
)
|
|
self.dl_thread.progress.connect(self.handle_dl_progress)
|
|
self.dl_thread.start()
|
|
print(".")
|
|
|
|
def update_permute_chk(self, state):
|
|
self.chk_permute_keep_first.setEnabled(state)
|
|
self.chk_permute_keep_last.setEnabled(state)
|
|
self.lbl_keep.setEnabled(state)
|
|
|
|
def setup_signals(self):
|
|
self.btn_download.clicked.connect(self.run_download)
|
|
self.inp_systems_dest_dl.setCurrentText(r"D:\devel\rust\ed_lrr_gui\DL\s.json")
|
|
self.inp_bodies_dest_dl.setCurrentText(r"D:\devel\rust\ed_lrr_gui\DL\b.json")
|
|
self.set_greedyness(self.sld_greedyness.value())
|
|
self.cmb_mode.currentTextChanged.connect(self.set_route_mode)
|
|
self.rd_comp.toggled.connect(self.set_comp_mode)
|
|
self.rd_precomp.toggled.connect(self.set_comp_mode)
|
|
self.sld_greedyness.valueChanged.connect(self.set_greedyness)
|
|
self.btn_go.clicked.connect(self.run)
|
|
self.btn_add.clicked.connect(self.add_system)
|
|
self.btn_rm.clicked.connect(self.remove_system)
|
|
self.chk_permute.stateChanged.connect(self.update_permute_chk)
|
|
self.btn_search.clicked.connect(self.resolve_systems)
|
|
self.btn_out_browse_pp.clicked.connect(
|
|
lambda: self.get_save_file("CSV File (*.csv)", self.set_sys_lst)
|
|
)
|
|
self.btn_sys_lst_browse.clicked.connect(
|
|
lambda: self.get_open_file("CSV File (*.csv)", self.set_sys_lst)
|
|
)
|
|
|
|
self.btn_bodies_browse_pp.clicked.connect(
|
|
lambda: self.get_open_file("JSON File (*.json)", self.set_bodies_file)
|
|
)
|
|
self.btn_bodies_dest_browse_dl.clicked.connect(
|
|
lambda: self.get_save_file("JSON File (*.json)", self.set_bodies_file)
|
|
)
|
|
self.btn_systems_browse_pp.clicked.connect(
|
|
lambda: self.get_open_file("JSON File (*.json)", self.set_systems_file)
|
|
)
|
|
self.btn_systems_dest_browse_dl.clicked.connect(
|
|
lambda: self.get_save_file("JSON File (*.json)", self.set_systems_file)
|
|
)
|
|
|
|
def handle_close(self):
|
|
cfg.write(self.config)
|
|
print("BYEEEEEE!")
|
|
|
|
def setup_styles(self, win, app):
|
|
for name in app.styles:
|
|
action = QAction(app)
|
|
action.setObjectName("action_load_style_" + name)
|
|
action.setText(name)
|
|
action.triggered.connect(lambda _, name=name: app.set_style(name))
|
|
self.menuStyle.addAction(action)
|
|
|
|
def setupUi(self, MainWindow, app):
|
|
super().setupUi(MainWindow)
|
|
self.update_dropdowns()
|
|
self.main_window = MainWindow
|
|
self.app = app
|
|
self.setup_signals()
|
|
self.lst_sys.setHeaderLabels(["Name", "Type"])
|
|
self.set_route_mode()
|
|
self.update_permute_chk(self.chk_permute.isChecked())
|
|
self.setup_styles(MainWindow, app)
|
|
|
|
|
|
def main():
|
|
app = App()
|
|
MainWindow = QMainWindow()
|
|
ui = ED_LRR()
|
|
ui.setupUi(MainWindow, app)
|
|
MainWindow.show()
|
|
ret = app.exec_()
|
|
ui.handle_close()
|
|
exit(ret)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
MP.freeze_support()
|
|
main()
|