409 lines
12 KiB
Python
409 lines
12 KiB
Python
import base64
|
|
import functools
|
|
import hashlib
|
|
import inspect
|
|
import json
|
|
import math
|
|
import operator as op
|
|
import os
|
|
import random
|
|
import shlex
|
|
import string
|
|
import subprocess as SP
|
|
import sys
|
|
import textwrap
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
from functools import wraps
|
|
from io import BytesIO
|
|
from pprint import pformat
|
|
from urllib.parse import quote, unquote_plus, urljoin, urlparse
|
|
from urllib.request import urlopen
|
|
|
|
import asteval
|
|
import cachetools
|
|
from cachetools import TTLCache
|
|
from dominate import tags
|
|
from flask import current_app, flash, json, redirect, request
|
|
from flask_login import current_user
|
|
from flask_login import login_required as _login_required
|
|
from flask_nav.renderers import Renderer, SimpleRenderer
|
|
from PIL import Image, ImageDraw, ImageFont
|
|
from slugify import slugify
|
|
|
|
|
|
def is_safe_url(target):
|
|
ref_url = urlparse(request.host_url)
|
|
test_url = urlparse(urljoin(request.host_url, target))
|
|
return test_url.scheme in (
|
|
"http", "https") and ref_url.netloc == test_url.netloc
|
|
|
|
|
|
def admin_required(func):
|
|
@functools.wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
if current_user.is_authenticated and current_user.is_admin:
|
|
return func(*args, **kwargs)
|
|
if current_user.is_authenticated:
|
|
flash("Insufficient permissions!", "error")
|
|
return redirect("/")
|
|
return current_app.login_manager.unauthorized()
|
|
|
|
wrapper.requires_admin = True
|
|
return wrapper
|
|
|
|
|
|
def login_required(func):
|
|
func = _login_required(func)
|
|
func.requires_login = True
|
|
return func
|
|
|
|
|
|
def timed_cache(**timedelta_kwargs):
|
|
kwargs = timedelta_kwargs or {"minutes": 10}
|
|
ttl = timedelta(**kwargs).total_seconds()
|
|
cache = TTLCache(sys.maxsize, ttl)
|
|
|
|
def make_key(*args, **kwargs):
|
|
args = list(args)
|
|
args[0] = type(args[0])
|
|
return cachetools.keys.hashkey(*args, **kwargs)
|
|
|
|
def _wrapper(func):
|
|
return cachetools.cached(cache, key=make_key)(func)
|
|
|
|
return _wrapper
|
|
|
|
|
|
def handle_config(cfg=None):
|
|
if cfg is None:
|
|
if os.path.isfile("config.json"):
|
|
with open("config.json") as fh:
|
|
cfg=json.load(fh)
|
|
with open("config.json", "w") as fh:
|
|
json.dump(cfg, fh, indent=4)
|
|
return cfg
|
|
|
|
|
|
def with_application_context(app):
|
|
def wrapper(func):
|
|
@wraps(func)
|
|
def wrapped(*args, **kwargs):
|
|
with app.app_context():
|
|
return func(*args, **kwargs)
|
|
|
|
return wrapped
|
|
|
|
return wrapper
|
|
|
|
|
|
def getsize(text, font_size):
|
|
font = ImageFont.truetype("arial.ttf", font_size)
|
|
return font.getsize_multiline(text)
|
|
|
|
|
|
def does_text_fit(text, width, height, font_size):
|
|
w, h = getsize(text, font_size)
|
|
return w < width and h < height
|
|
|
|
|
|
def make_placeholder_image(text, width, height, poster=None, wrap=0):
|
|
width = int(width)
|
|
height = int(height)
|
|
wrap = int(wrap)
|
|
font_size = 1
|
|
bounds = (0, 1)
|
|
if wrap:
|
|
text = textwrap.fill(text, wrap)
|
|
while True:
|
|
if not does_text_fit(text, width, height, bounds[1]):
|
|
break
|
|
bounds = (bounds[1], bounds[1] * 2)
|
|
prev_bounds = None
|
|
while True:
|
|
if does_text_fit(text, width, height, bounds[1]):
|
|
bounds = (int(round(sum(bounds) / 2, 0)), bounds[1])
|
|
else:
|
|
bounds = (bounds[0], int(round(sum(bounds) / 2, 0)))
|
|
if prev_bounds == bounds:
|
|
break
|
|
prev_bounds = bounds
|
|
font_size = bounds[0]
|
|
io = BytesIO()
|
|
im = Image.new("RGBA", (width, height), "#222")
|
|
draw = ImageDraw.Draw(im)
|
|
font = ImageFont.truetype("arial.ttf", font_size)
|
|
w, h = getsize(text, font_size)
|
|
if poster:
|
|
try:
|
|
with urlopen(poster) as fh:
|
|
poster = Image.open(fh)
|
|
except Exception:
|
|
poster = None
|
|
else:
|
|
poster_size = poster.size
|
|
factor = width / poster_size[0]
|
|
new_size = (
|
|
math.ceil(poster_size[0] * factor),
|
|
math.ceil(poster_size[1] * factor),
|
|
)
|
|
poster = poster.resize(new_size)
|
|
mid = -int((poster.size[1] - height) / 2)
|
|
im.paste(poster, (0, mid))
|
|
draw.text(((width - w) / 2, (height - h) / 2),
|
|
text, fill="#eee", font=font)
|
|
im.save(io, "PNG")
|
|
io.seek(0)
|
|
return io
|
|
|
|
|
|
def make_tree(files, child_key="children"):
|
|
tree = {}
|
|
for file in files:
|
|
root = tree
|
|
parts = file["name"].split("/")
|
|
for item in parts:
|
|
if item not in root:
|
|
root[item] = {}
|
|
prev_root = root
|
|
root = root[item]
|
|
prev_root[item] = {"__info__": file}
|
|
return tree
|
|
|
|
|
|
class BootsrapRenderer(Renderer):
|
|
def visit_Navbar(self, node):
|
|
sub = []
|
|
for item in node.items:
|
|
sub.append(self.visit(item))
|
|
ret = tags.ul(sub, cls="navbar-nav mr-auto")
|
|
return ret
|
|
|
|
def visit_Text(self, node):
|
|
return tags.span(node.text, cls="navbar-text")
|
|
|
|
def visit_View(self, node):
|
|
badge = node.url_for_kwargs.pop("__badge", None)
|
|
classes = ["nav-link"]
|
|
if hasattr(node, "classes"):
|
|
classes = node.classes
|
|
if node.active:
|
|
classes.append("active")
|
|
ret = [tags.a(node.text, href=node.get_url(), cls=" ".join(classes))]
|
|
if badge:
|
|
ret.insert(
|
|
0,
|
|
tags.span(
|
|
badge[0], cls="badge badge-{} notification-badge".format(badge[1])
|
|
),
|
|
)
|
|
return tags.li(
|
|
ret,
|
|
cls="nav-item",
|
|
)
|
|
|
|
def visit_Subgroup(self, node):
|
|
url = "#"
|
|
classes = []
|
|
child_active = False
|
|
if node.title == "":
|
|
active = False
|
|
for item in node.items:
|
|
if item.active:
|
|
classes.append("active")
|
|
break
|
|
node, *children = node.items
|
|
for c in children:
|
|
if c.active:
|
|
child_active = True
|
|
break
|
|
node.items = children
|
|
node.title = node.text
|
|
url = node.get_url()
|
|
dropdown = tags.ul(
|
|
[
|
|
tags.li(
|
|
tags.a(
|
|
item.text,
|
|
href=item.get_url(),
|
|
cls="nav-link active" if item.active else "nav-link",
|
|
style="",
|
|
),
|
|
cls="nav-item",
|
|
)
|
|
for item in node.items
|
|
],
|
|
cls="dropdown-menu ",
|
|
)
|
|
link = tags.a(
|
|
node.title,
|
|
href=url,
|
|
cls="nav-link active" if node.active else "nav-link",
|
|
style="",
|
|
)
|
|
toggle = tags.a(
|
|
[],
|
|
cls="dropdown-toggle nav-link active"
|
|
if child_active
|
|
else "dropdown-toggle nav-link",
|
|
data_toggle="dropdown",
|
|
href="#",
|
|
style="padding-left: 0px; padding-top: 10px",
|
|
)
|
|
# almost the same as visit_Navbar, but written a bit more concise
|
|
return [link, tags.li([toggle, dropdown], cls="dropdown nav-item")]
|
|
|
|
|
|
def eval_expr(expr, ctx=None):
|
|
aeval = asteval.Interpreter(minimal=True, use_numpy=False, symtable=ctx)
|
|
return aeval(expr)
|
|
|
|
|
|
def sort_by(values, expr):
|
|
return sorted(value, key=lambda v: eval_expr(expr, v))
|
|
|
|
|
|
def genpw(num=20):
|
|
return "".join(
|
|
random.choice(string.ascii_lowercase + string.ascii_uppercase + string.digits)
|
|
for _ in range(num)
|
|
)
|
|
|
|
|
|
def populate_form(form, cfg=None):
|
|
if cfg is None:
|
|
cfg = handle_config()
|
|
for name, field in form._fields.items():
|
|
field.default = cfg.get(name)
|
|
form.transcode_default_profile.choices = [(None, "")]
|
|
form.transcode_default_profile.choices += [
|
|
(k, k) for k in (cfg.get("transcode_profiles", {}) or {}).keys()
|
|
]
|
|
|
|
|
|
def validate_transcoding_profiles(profiles):
|
|
for name, data in profiles.items():
|
|
for req, req_type in [("command", str), ("doc", str)]:
|
|
if req not in data:
|
|
raise ValueError(
|
|
"Profile '{}' is missing required key '{}'".format(
|
|
name, req))
|
|
if not isinstance(data[req], req_type):
|
|
raise ValueError(
|
|
"Key '{}' of profile '{}' should be of type '{}'".format(
|
|
req, name, req_type.__name__
|
|
)
|
|
)
|
|
|
|
|
|
def setup_template_filters(app):
|
|
@app.template_filter("flatten")
|
|
def flatten(obj, path=None):
|
|
path = path or ""
|
|
if isinstance(obj, dict):
|
|
for k, v in sorted(obj.items()):
|
|
yield from flatten(v, "{}.{}".format(path, k))
|
|
elif isinstance(obj, list):
|
|
for k, v in enumerate(obj):
|
|
yield from flatten(v, "{}[{}]".format(path, k))
|
|
else:
|
|
yield path.lstrip("."), obj
|
|
|
|
@app.template_filter("defaultattr")
|
|
def defaultattr(lst, attr, val):
|
|
assert isinstance(lst, list)
|
|
for item in lst:
|
|
assert isinstance(item, dict)
|
|
if attr not in item:
|
|
item[attr] = val
|
|
return lst
|
|
|
|
@app.template_filter("pformat")
|
|
def t_pformat(o):
|
|
return pformat(o)
|
|
|
|
@app.template_filter("hash")
|
|
def t_hash(s):
|
|
return hashlib.sha512(bytes(s, "utf-8")).hexdigest()
|
|
|
|
@app.template_filter()
|
|
def regex_replace(s, find, replace):
|
|
"""A non-optimal implementation of a regex filter"""
|
|
return re.sub(find, replace, s)
|
|
|
|
@app.template_filter("ctime")
|
|
def timectime(s):
|
|
return time.ctime(s)
|
|
|
|
@app.template_filter("ago")
|
|
def timeago(s, clamp=False):
|
|
seconds = round(time.time() - s, 0)
|
|
if clamp:
|
|
seconds = max(0, seconds)
|
|
return timedelta(seconds=seconds)
|
|
|
|
@app.template_filter("ago_dt")
|
|
def ago_dt(s, rnd=None):
|
|
dt = datetime.today() - s
|
|
if rnd is not None:
|
|
secs = round(dt.total_seconds(), rnd)
|
|
dt = timedelta(seconds=secs)
|
|
return str(dt).rstrip("0")
|
|
|
|
@app.template_filter("ago_dt_utc")
|
|
def ago_dt_utc(s, rnd=None):
|
|
dt = datetime.utcnow() - s
|
|
if rnd is not None:
|
|
secs = round(dt.total_seconds(), rnd)
|
|
dt = timedelta(seconds=secs)
|
|
return str(dt).rstrip("0")
|
|
|
|
@app.template_filter("ago_dt_utc_human")
|
|
def ago_dt_utc_human(s, swap=False, rnd=None):
|
|
if not swap:
|
|
dt = datetime.utcnow() - s
|
|
else:
|
|
dt = s - datetime.utcnow()
|
|
if rnd is not None:
|
|
secs = round(dt.total_seconds(), rnd)
|
|
dt = timedelta(seconds=secs)
|
|
if dt.total_seconds() < 0:
|
|
return "In " + str(-dt).rstrip("0")
|
|
else:
|
|
return str(dt).rstrip("0") + " ago"
|
|
|
|
@app.template_filter("timedelta")
|
|
def time_timedelta(s, digits=None, clamp=False):
|
|
if clamp:
|
|
s = max(s, 0)
|
|
if digits is not None:
|
|
s = round(s, digits)
|
|
return timedelta(seconds=s)
|
|
|
|
@app.template_filter("base64")
|
|
def jinja_b64(s):
|
|
return str(base64.b64encode(bytes(s, "utf8")), "utf8")
|
|
|
|
@app.template_filter("fromiso")
|
|
def time_fromiso(s):
|
|
t = s.rstrip("Z").split(".")[0]
|
|
t = datetime.strptime(t, "%Y-%m-%dT%H:%M:%S")
|
|
try:
|
|
t.microsecond = int(s.rstrip("Z").split(".")[1])
|
|
except BaseException:
|
|
pass
|
|
return t
|
|
|
|
app.add_template_global(urljoin, "urljoin")
|
|
|
|
@app.template_filter("slugify")
|
|
def make_slug(s):
|
|
return slugify(s, only_ascii=True)
|
|
|
|
@app.template_filter("fromjson")
|
|
def fromjson(s):
|
|
return json.loads(s)
|
|
|
|
app.template_filter()(make_tree)
|
|
app.add_template_global(handle_config, "cfg")
|