# -*- coding: utf-8 -*- from flask import ( Flask, jsonify, render_template, redirect, url_for, send_from_directory, request, flash, current_app, ) from flask.cli import AppGroup import uuid import os import click from functools import wraps from concurrent.futures.process import BrokenProcessPool from datetime import datetime, timedelta from webargs import fields, validate from webargs.flaskparser import use_kwargs from flask_executor import Executor from flask_sqlalchemy import SQLAlchemy from flask_bootstrap import Bootstrap from flask_nav import Nav, register_renderer from flask_nav.elements import Navbar, View from flask_admin import Admin from flask_admin.contrib.sqla import ModelView from flask_wtf.csrf import CSRFProtect from flask_login import ( LoginManager, current_user, logout_user, UserMixin, AnonymousUserMixin, login_user, login_required, ) from flask_debugtoolbar import DebugToolbarExtension from sqlalchemy_utils import generic_repr, JSONType, PasswordType, UUIDType from sqlalchemy.orm import relationship from sqlalchemy.types import DateTime from jinja2.exceptions import TemplateNotFound from .forms import RouteForm, LoginForm, RegisterForm, ChangePasswordForm from .utils import prepare_route, BootsrapRenderer, is_safe_url import _ed_lrr as ed_lrr templates = os.path.join(os.path.dirname(os.path.abspath(__file__)), "templates") app = Flask(__name__, template_folder=templates) app.config.from_pyfile("config.py") app.executor = executor = Executor(app) app.db = db = SQLAlchemy(app) app.bootstrap = bootstrap = Bootstrap(app) app.csrf = csfr = CSRFProtect(app) app.nav = nav = Nav(app) app.login_manager = login_manager = LoginManager(app) login_manager.login_view = "login" login_manager.session_protection = "strong" admin = Admin(app, name="ED_LRR", template_mode="bootstrap3") app.debug = True app.toolbar = toolbar = DebugToolbarExtension(app) def wants_json_response(): return ( request.accept_mimetypes["application/json"] >= request.accept_mimetypes["text/html"] ) @app.errorhandler(422) @app.errorhandler(400) @app.errorhandler(500) @app.errorhandler(404) def handle_error(err): if wants_json_response(): return jsonify(error=str(err), code=err.code), err.code templates = ["error/{}.html".format(err.code), "error/default.html"] try: print(dir(err)) return render_template(templates, error=err), err.code except TemplateNotFound: return err.get_response() def role_required(*roles): def wrapper(fn): @wraps(fn) def decorated_view(*args, **kwargs): if not current_user.is_authenticated(): return current_app.login_manager.unauthorized() has_role = False user = current_app.login_manager.reload_user() for role in roles: has_role |= user.has_role(role) if not has_role: return current_app.login_manager.unauthorized() return fn(*args, **kwargs) return decorated_view return wrapper @login_manager.user_loader def load_user(user_name): return User.query.get(user_name) @login_manager.request_loader def load_user_from_header(header_val): for api_key in [request.args.get("api_key"), request.headers.get("X-API-Key")]: if api_key: user = User.query.filter_by(api_key=api_key).one_or_none() if user: return user return None return None def left_nav(): links = [ View("Home", "index"), View("Route", "route"), View("Jobs", "status", job_id=None), ] if current_user.has_role("admin") or current_user.has_role("worker_host"): links.insert(2, View("Workers", "worker")) return Navbar("E:D LRR", *links) def right_nav(): links = [View("Login", "login"), View("Register", "register")] if current_user.is_authenticated: links = [View("Change Password", "change_password"), View("Logout", "logout")] if current_user.has_role("admin"): links = [View("Admin", "admin.index")] + links return Navbar("", *links) register_renderer(app, "bootstrap4", BootsrapRenderer) nav.register_element("left_nav", left_nav) nav.register_element("right_nav", right_nav) def compute_route(args, kwargs): return ed_lrr.route(*args, **kwargs) class AnonymousUser(AnonymousUserMixin): def has_role(self, role): return False @property def roles(self): return [] @roles.setter def __set_roles(self, value): raise NotImplementedError login_manager.anonymous_user = AnonymousUser @generic_repr class Worker(db.Model): id = db.Column( UUIDType(binary=False, native=False), primary_key=True, default=uuid.uuid4 ) name = db.Column(db.String, unique=True) current_job = db.Column( UUIDType(binary=False, native=False), db.ForeignKey("job.id"), nullable=True, default=None, ) job = relationship("Job", backref="workers") last_active = db.Column(DateTime, nullable=True, default=None) owner_name = db.Column( db.String, db.ForeignKey("user.name"), nullable=True, index=True ) owner = relationship("User", backref="workers") user_roles = db.Table( "user_roles", db.Column("user_name", db.String, db.ForeignKey("user.name"), primary_key=True), db.Column("role_name", db.String, db.ForeignKey("role.name"), primary_key=True), ) class Role(db.Model): name = db.Column(db.String, unique=True, index=True, primary_key=True) def __init__(self, name): self.name = name def __repr__(self): return self.name class User(db.Model, UserMixin): name = db.Column(db.String, unique=True, index=True, primary_key=True) is_active = db.Column(db.Boolean, default=False) api_key = db.Column( UUIDType(binary=False, native=False), nullable=True, default=uuid.uuid4, index=True, ) password = db.Column(PasswordType(schemes=["pbkdf2_sha512"], max_length=256)) created = db.Column(DateTime, default=datetime.today) roles = db.relationship("Role", secondary="user_roles") def add_roles(self, roles): for role_name in roles: role = Role.query.filter_by(name=role_name).one() if role not in self.roles: self.roles.append(role) db.session.commit() def has_role(self, role_name): return ( Role.query.join(User.roles) .filter(User.name == self.name, Role.name == role_name) .count() > 0 ) def reset_api_key(self): self.api_key = uuid.uuid4() db.session.add(self) db.session.comiit() def get_id(self): return self.name def __repr__(self): return self.name class Job(db.Model): id = db.Column( UUIDType(binary=False, native=False), primary_key=True, default=uuid.uuid4 ) user_name = db.Column( db.String, db.ForeignKey("user.name"), nullable=True, index=True ) func = db.Column(db.String) args = db.Column(JSONType) kwargs = db.Column(JSONType) state = db.Column(JSONType, default={}) priority = db.Column(db.Integer, default=0, nullable=True) created = db.Column(DateTime, default=datetime.today) finished = db.Column(DateTime, nullable=True, default=None) started = db.Column(DateTime, nullable=True, default=None) last_update = db.Column(DateTime, nullable=True, default=None) user = relationship("User", backref="jobs") # ============================================================ def __repr__(self): return str(self.id) @property def future(self): fut = executor.futures._futures.get(self.id) return fut @property def sort_key(self): state_priorities = { "Queued": 0, "Starting": 1, "Error": 1, "Stalled": 1, "Running": 1, } status_key = state_priorities.get(self.status[1], -1) + 1 user = 1 - int(self.user is not None) return (user, -status_key, self.priority, self.created) @property def age(self): dt = datetime.today() - self.created return dt - dt % timedelta(seconds=1) @classmethod def get_next(cls): for job in sorted(cls.query.all(), key=lambda v: v.sort_key): if job.status[1] in ["Done"]: continue return job return None # return cls.query. @property def status(self): # [ # ("primary", "Done"), # ("danger", "Error"), # ("info", "Stalled"), # ("success", "Running"), # ("secondary", "Starting"), # ("warning", "Queued"), # ] # return states[self.id.int%len(states)] if self.state.get("result"): return ("primary", "Done") if self.state.get("error"): return ("danger", "Error") if self.state.get("progress"): if (datetime.today() - self.last_update).total_seconds() > (60 * 10): return ("info", "Stalled") return ("success", "Running") if self.started is not None: return ("secondary", "Starting") return ("warning", "Queued") @status.setter def __set_status(self): raise NotImplementedError @property def dict(self): return { "id": self.id, "args": self.args, "kwargs": self.kwargs, "state": self.state, "finished": self.finished, "created": self.created, "started": self.started, } @dict.setter def __set_dict(self, value): raise NotImplementedError @property def route(self): try: return prepare_route(self.state["result"]) except KeyError: return None @property def t_rem(self): if self.started is None: return None runtime = datetime.today() - self.started try: prc_done = self.state["progress"]["prc_done"] if prc_done != 0: t_rem = (runtime / prc_done) * (100 - prc_done) return timedelta(seconds=round(t_rem.total_seconds(), 0)) return None except KeyError: return None @t_rem.setter def __set_t_rem(self, value): raise NotImplementedError @classmethod def new(cls, func, args=None, kwargs=None): args = args or () kwargs = kwargs or {} job = cls(args=args, kwargs=kwargs, func=func.__qualname__) job.__last_upd = 0.0 if current_user.is_authenticated: job.user = current_user db.session.add(job) db.session.commit() return job def start(self): global executor self.state = {} self.started = None db.session.add(self) db.session.commit() args = self.args + [self.callback] try: future = executor.submit_stored(self.id, compute_route, args, self.kwargs) except (BrokenProcessPool, RuntimeError) as e: print("Error:", e) print("Restarting Executor!") executor = Executor(app) future = executor.submit_stored(self.id, compute_route, args, self.kwargs) future.add_done_callback(self.done) def callback(self, cb_state): try: if self.started is None: self.started = datetime.today() if self.last_update is not None: time_since_last_upd = ( datetime.today() - self.last_update ).total_seconds() if time_since_last_upd < 5.0: return state = {} state.update(self.state) state.update({"progress": cb_state}) self.state = state self.last_update = datetime.today() db.session.add(self) db.session.commit() except Exception as e: print(e) def done(self, future): print(self.id, "DONE") state = {} state.update(self.state) executor.futures.pop(self.id) exc = future.exception() if exc: state.update( {"error": {"type": type(exc).__name__, "args": list(exc.args)}} ) else: state.update({"result": future.result()}) self.state = state self.finished = datetime.now() db.session.add(self) db.session.commit() class Ship(db.Model): user_name = db.Column( db.String, db.ForeignKey("user.name"), nullable=True, index=True, primary_key=True ) user = relationship("User", backref="ships") ship = db.Column(db.JSONType, nullable=False, index=True, primary_key = True) db.create_all() for role in ["admin", "user", "worker_host"]: if Role.query.filter_by(name=role).one_or_none() is None: db.session.add(Role(role)) def create_user(name, password, roles, active=False): user = User.query.filter_by(name=name).one_or_none() if user: db.session.delete(user) user = User(name=name, password=password, is_active=active) user.add_roles(roles) db.session.add(user) db.session.commit() return user # create_user("admin", "admin", ["admin", "user"], True) # create_user("user", "user", ["user"], True) # create_user("host", "host", ["user", "worker_host"], True) class SQLAView(ModelView): column_exclude_list = ["password"] column_editable_list = [] create_modal = True edit_modal = True can_view_details = True column_display_pk = True def is_accessible(self): return current_user.is_authenticated and current_user.has_role("admin") def inaccessible_callback(self, name, **kwargs): return redirect(url_for("login")) class UserView(SQLAView): from wtforms import PasswordField column_list = ("name", "active", "password", "api_key", "roles") column_formatters = { "password": lambda view, context, model, name: "", "api_key": lambda view, context, model, name: model.api_key or "", } form_extra_fields = {"password": PasswordField("Password")} class JobView(SQLAView): # Job.id,Job.user,Job.func,Job.args,Job.kwargs,Job.state,Job.created,Job.finished,Job.started,Job.last_update column_list = ("id", "status", "user", "created", "started", "finished") column_formatters = {"status": lambda view, context, model, name: model.status[1]} class WorkerView(SQLAView): pass # # Job.id,Job.user,Job.func,Job.args,Job.kwargs,Job.state,Job.created,Job.finished,Job.started,Job.last_update # column_list = ("id", "status", "user", "created", "started", "finished") # column_formatters = { # "user": lambda view, context, model, name: model.user.name # if model.user # else "", # "status": lambda view, context, model, name: model.status[1], # } admin.add_view(JobView(Job, db.session)) admin.add_view(UserView(User, db.session)) admin.add_view(SQLAView(Worker, db.session)) admin.add_view(SQLAView(Role, db.session)) def submit_job(func, *args, **kwargs): job = Job.new(func, args, kwargs) job.start() return job.id @app.route("/api/route", methods=["GET", "POST"]) @use_kwargs( { "jump_range": fields.Float(required=True), "mode": fields.String( missing="bfs", validate=validate.OneOf(["bfs", "greedy", "a-star"]) ), "systems": fields.DelimitedList(fields.String, required=True), "permute": fields.String( missing=None, validate=validate.OneOf( ["off", "all", "keep_first", "keep_last", "keep_both"] ), ), "primary": fields.Boolean(missing=False), "factor": fields.Float(missing=0.5), } ) def api_route(_=None, **args): if args["permute"] == "off": args["permute"] = None args["systems"] = [s.strip() for s in args["systems"]] args = ( args["systems"], args["jump_range"], None, args["mode"], args["primary"], args["permute"] is not None, args["permute"] in ["keep_first", "keep_both"], args["permute"] in ["keep_last", "keep_both"], args["factor"], None, r"D:\devel\rust\ED_LRR\stars.csv", app.config["ROUTE_WORKERS"], ) return jsonify({"id": submit_job(ed_lrr.route, *args)}) @app.route("/api/status") def api_status(): info = {"queued_jobs": len(executor.futures._futures)} return jsonify(info) @app.route("/api/whoami") def api_whoami(): return jsonify({"name": current_user.name}) @app.route("/api/status/") def api_job_status(job_id): job = Job.query.get_or_404(str(job_id)) return jsonify(job.dict) @app.route("/static/") def send_static(path): return send_from_directory("static", path) @app.route("/route", methods=["GET", "POST"]) @login_required def route(): form = RouteForm() if form.validate_on_submit(): data = dict(form.data) if data["permute"] == "off": data["permute"] = None del data["csrf_token"] del data["submit"] job = api_route(data) return redirect(url_for("status", job_id=job.json["id"])) return render_template("form.html", form=form, title="Plot Route") @app.route("/status/", defaults={"job_id": None}) @app.route("/status/") @login_required def status(job_id=None): if job_id is not None: job = Job.query.get_or_404(str(job_id)) return render_template("job.html", job=job) return render_template("status.html", Job=Job, state=request.args.get("state")) @app.route("/") def index(): return render_template("index.html") @app.route("/login", methods=["GET", "POST"]) def login(): if current_user.is_authenticated: return redirect(url_for("index")) form = LoginForm() if form.validate_on_submit(): user = User.query.filter_by(name=form.data["username"]).one_or_none() if (user is None) or (user.password != form.data["password"]): flash("Invalid credentials!", "danger") return redirect(url_for("login")) if not user.is_active: flash("Account is deactivated!", "warning") return redirect(url_for("login")) login_user(user, remember=form.data["remember"]) next = request.args.get("next") if not is_safe_url(next): next = None return redirect(next or url_for("status")) return render_template("form.html", form=form, title="Login") @app.route("/register", methods=["GET", "POST"]) def register(): form = RegisterForm() if form.validate_on_submit(): if User.query.filter_by(name=form.data["username"]).one_or_none() is not None: flash("Username already exists", "danger") return render_template("form.html", form=form, title="Register") user = User() user.name = form.data["username"] user.password = form.data["password"] db.session.add(user) db.session.commit() login_user(user) return redirect(url_for("status")) return render_template("form.html", form=form, title="Register") @app.route("/change_password", methods=["GET", "POST"]) def change_password(): if current_user.is_anonymous: return redirect(url_for("index")) form = ChangePasswordForm() if form.validate_on_submit(): if form.data["old_password"] == current_user.password: current_user.password = form.data["password"] flash("Password changed!", "success") else: flash("Wrong password!", "danger") return render_template("form.html", form=form, title="Register") return redirect(url_for("status")) return render_template("form.html", form=form, title="Register") @app.route("/workers/", defaults={"worker_id": None}) @app.route("/workers/") @login_required def worker(worker_id): return render_template("workers.html") @app.route("/logout") def logout(): logout_user() return redirect(url_for("login")) @app.before_first_request def resume_jobs(): print("NEXT:", Job.get_next()) with app.test_request_context(): for job in Job.query.all(): if job.status[1] != "Done": print("Restarting {} with state {}".format(job.id, job.status[1])) job.start() user_cli = AppGroup('user', help="Manage users") job_cli = AppGroup('job', help="Manage Jobs") worker_cli = AppGroup('worker', help="Manage Workers") @app.cli.command("gevent") def cmd_gevent(): return @user_cli.command("create") @click.argument("name") @click.option("-i", "--inactive", help="Crate account as inactive", is_flag=True, default=False) @click.option("-r", "--role", help="Assign role to account", default=["user"], multiple=True) @click.password_option("-p", "--password", help="Password for user") def cmd_create_user(name, role, password, inactive): "Create a new user" create_user(name, password, role, not inactive) print("User created!") app.cli.add_command(user_cli) app.cli.add_command(job_cli) app.cli.add_command(worker_cli) if __name__ == "__main__": app.run(host="127.0.0.1", port=3777, debug=True)