import base64 import functools import hashlib import inspect import json import math import operator as op import os import random import shlex import string import subprocess as SP import sys import textwrap import time from datetime import datetime, timedelta from functools import wraps from io import BytesIO from pprint import pformat from urllib.parse import quote, unquote_plus, urljoin, urlparse from urllib.request import urlopen import asteval import cachetools from cachetools import TTLCache from dominate import tags from flask import current_app, flash, json, redirect, request from flask_login import current_user from flask_login import login_required as _login_required from flask_nav.renderers import Renderer, SimpleRenderer from PIL import Image, ImageDraw, ImageFont from slugify import slugify def is_safe_url(target): ref_url = urlparse(request.host_url) test_url = urlparse(urljoin(request.host_url, target)) return test_url.scheme in ( "http", "https") and ref_url.netloc == test_url.netloc def admin_required(func): @functools.wraps(func) def wrapper(*args, **kwargs): if current_user.is_authenticated and current_user.is_admin: return func(*args, **kwargs) if current_user.is_authenticated: flash("Insufficient permissions!", "error") return redirect("/") return current_app.login_manager.unauthorized() wrapper.requires_admin = True return wrapper def login_required(func): func = _login_required(func) func.requires_login = True return func def timed_cache(**timedelta_kwargs): kwargs = timedelta_kwargs or {"minutes": 10} ttl = timedelta(**kwargs).total_seconds() cache = TTLCache(sys.maxsize, ttl) def make_key(*args, **kwargs): args = list(args) args[0] = type(args[0]) return cachetools.keys.hashkey(*args, **kwargs) def _wrapper(func): return cachetools.cached(cache, key=make_key)(func) return _wrapper def handle_config(cfg=None): if cfg is None: if os.path.isfile("config.json"): with open("config.json") as fh: cfg=json.load(fh) with open("config.json", "w") as fh: json.dump(cfg, fh, indent=4) return cfg def with_application_context(app): def wrapper(func): @wraps(func) def wrapped(*args, **kwargs): with app.app_context(): return func(*args, **kwargs) return wrapped return wrapper def getsize(text, font_size): font = ImageFont.truetype("arial.ttf", font_size) return font.getsize_multiline(text) def does_text_fit(text, width, height, font_size): w, h = getsize(text, font_size) return w < width and h < height def make_placeholder_image(text, width, height, poster=None, wrap=0): width = int(width) height = int(height) wrap = int(wrap) font_size = 1 bounds = (0, 1) if wrap: text = textwrap.fill(text, wrap) while True: if not does_text_fit(text, width, height, bounds[1]): break bounds = (bounds[1], bounds[1] * 2) prev_bounds = None while True: if does_text_fit(text, width, height, bounds[1]): bounds = (int(round(sum(bounds) / 2, 0)), bounds[1]) else: bounds = (bounds[0], int(round(sum(bounds) / 2, 0))) if prev_bounds == bounds: break prev_bounds = bounds font_size = bounds[0] io = BytesIO() im = Image.new("RGBA", (width, height), "#222") draw = ImageDraw.Draw(im) font = ImageFont.truetype("arial.ttf", font_size) w, h = getsize(text, font_size) if poster: try: with urlopen(poster) as fh: poster = Image.open(fh) except Exception: poster = None else: poster_size = poster.size factor = width / poster_size[0] new_size = ( math.ceil(poster_size[0] * factor), math.ceil(poster_size[1] * factor), ) poster = poster.resize(new_size) mid = -int((poster.size[1] - height) / 2) im.paste(poster, (0, mid)) draw.text(((width - w) / 2, (height - h) / 2), text, fill="#eee", font=font) im.save(io, "PNG") io.seek(0) return io def make_tree(files, child_key="children"): tree = {} for file in files: root = tree parts = file["name"].split("/") for item in parts: if item not in root: root[item] = {} prev_root = root root = root[item] prev_root[item] = {"__info__": file} return tree class BootsrapRenderer(Renderer): def visit_Navbar(self, node): sub = [] for item in node.items: sub.append(self.visit(item)) ret = tags.ul(sub, cls="navbar-nav mr-auto") return ret def visit_Text(self, node): return tags.span(node.text, cls="navbar-text") def visit_View(self, node): badge = node.url_for_kwargs.pop("__badge", None) classes = ["nav-link"] if hasattr(node, "classes"): classes = node.classes if node.active: classes.append("active") ret = [tags.a(node.text, href=node.get_url(), cls=" ".join(classes))] if badge: ret.insert( 0, tags.span( badge[0], cls="badge badge-{} notification-badge".format(badge[1]) ), ) return tags.li( ret, cls="nav-item", ) def visit_Subgroup(self, node): url = "#" classes = [] child_active = False if node.title == "": active = False for item in node.items: if item.active: classes.append("active") break node, *children = node.items for c in children: if c.active: child_active = True break node.items = children node.title = node.text url = node.get_url() dropdown = tags.ul( [ tags.li( tags.a( item.text, href=item.get_url(), cls="nav-link active" if item.active else "nav-link", style="", ), cls="nav-item", ) for item in node.items ], cls="dropdown-menu ", ) link = tags.a( node.title, href=url, cls="nav-link active" if node.active else "nav-link", style="", ) toggle = tags.a( [], cls="dropdown-toggle nav-link active" if child_active else "dropdown-toggle nav-link", data_toggle="dropdown", href="#", style="padding-left: 0px; padding-top: 10px", ) # almost the same as visit_Navbar, but written a bit more concise return [link, tags.li([toggle, dropdown], cls="dropdown nav-item")] def eval_expr(expr, ctx=None): aeval = asteval.Interpreter(minimal=True, use_numpy=False, symtable=ctx) return aeval(expr) def sort_by(values, expr): return sorted(value, key=lambda v: eval_expr(expr, v)) def genpw(num=20): return "".join( random.choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for _ in range(num) ) def populate_form(form, cfg=None): if cfg is None: cfg = handle_config() for name, field in form._fields.items(): field.default = cfg.get(name) form.transcode_default_profile.choices = [(None, "")] form.transcode_default_profile.choices += [ (k, k) for k in (cfg.get("transcode_profiles", {}) or {}).keys() ] def validate_transcoding_profiles(profiles): for name, data in profiles.items(): for req, req_type in [("command", str), ("doc", str)]: if req not in data: raise ValueError( "Profile '{}' is missing required key '{}'".format( name, req)) if not isinstance(data[req], req_type): raise ValueError( "Key '{}' of profile '{}' should be of type '{}'".format( req, name, req_type.__name__ ) ) def setup_template_filters(app): @app.template_filter("flatten") def flatten(obj, path=None): path = path or "" if isinstance(obj, dict): for k, v in sorted(obj.items()): yield from flatten(v, "{}.{}".format(path, k)) elif isinstance(obj, list): for k, v in enumerate(obj): yield from flatten(v, "{}[{}]".format(path, k)) else: yield path.lstrip("."), obj @app.template_filter("defaultattr") def defaultattr(lst, attr, val): assert isinstance(lst, list) for item in lst: assert isinstance(item, dict) if attr not in item: item[attr] = val return lst @app.template_filter("pformat") def t_pformat(o): return pformat(o) @app.template_filter("hash") def t_hash(s): return hashlib.sha512(bytes(s, "utf-8")).hexdigest() @app.template_filter() def regex_replace(s, find, replace): """A non-optimal implementation of a regex filter""" return re.sub(find, replace, s) @app.template_filter("ctime") def timectime(s): return time.ctime(s) @app.template_filter("ago") def timeago(s, clamp=False): seconds = round(time.time() - s, 0) if clamp: seconds = max(0, seconds) return timedelta(seconds=seconds) @app.template_filter("ago_dt") def ago_dt(s, rnd=None): dt = datetime.today() - s if rnd is not None: secs = round(dt.total_seconds(), rnd) dt = timedelta(seconds=secs) return str(dt).rstrip("0") @app.template_filter("ago_dt_utc") def ago_dt_utc(s, rnd=None): dt = datetime.utcnow() - s if rnd is not None: secs = round(dt.total_seconds(), rnd) dt = timedelta(seconds=secs) return str(dt).rstrip("0") @app.template_filter("ago_dt_utc_human") def ago_dt_utc_human(s, swap=False, rnd=None): if not swap: dt = datetime.utcnow() - s else: dt = s - datetime.utcnow() if rnd is not None: secs = round(dt.total_seconds(), rnd) dt = timedelta(seconds=secs) if dt.total_seconds() < 0: return "In " + str(-dt).rstrip("0") else: return str(dt).rstrip("0") + " ago" @app.template_filter("timedelta") def time_timedelta(s, digits=None, clamp=False): if clamp: s = max(s, 0) if digits is not None: s = round(s, digits) return timedelta(seconds=s) @app.template_filter("base64") def jinja_b64(s): return str(base64.b64encode(bytes(s, "utf8")), "utf8") @app.template_filter("fromiso") def time_fromiso(s): t = s.rstrip("Z").split(".")[0] t = datetime.strptime(t, "%Y-%m-%dT%H:%M:%S") try: t.microsecond = int(s.rstrip("Z").split(".")[1]) except BaseException: pass return t app.add_template_global(urljoin, "urljoin") @app.template_filter("slugify") def make_slug(s): return slugify(s, only_ascii=True) @app.template_filter("fromjson") def fromjson(s): return json.loads(s) app.template_filter()(make_tree) app.add_template_global(handle_config, "cfg")