MediaDash/utils.py

409 lines
12 KiB
Python

import base64
import functools
import hashlib
import inspect
import json
import math
import operator as op
import os
import random
import shlex
import string
import subprocess as SP
import sys
import textwrap
import time
from datetime import datetime, timedelta
from functools import wraps
from io import BytesIO
from pprint import pformat
from urllib.parse import quote, unquote_plus, urljoin, urlparse
from urllib.request import urlopen
import asteval
import cachetools
from cachetools import TTLCache
from dominate import tags
from flask import current_app, flash, json, redirect, request
from flask_login import current_user
from flask_login import login_required as _login_required
from flask_nav.renderers import Renderer, SimpleRenderer
from PIL import Image, ImageDraw, ImageFont
from slugify import slugify
def is_safe_url(target):
ref_url = urlparse(request.host_url)
test_url = urlparse(urljoin(request.host_url, target))
return test_url.scheme in (
"http", "https") and ref_url.netloc == test_url.netloc
def admin_required(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
if current_user.is_authenticated and current_user.is_admin:
return func(*args, **kwargs)
if current_user.is_authenticated:
flash("Insufficient permissions!", "error")
return redirect("/")
return current_app.login_manager.unauthorized()
wrapper.requires_admin = True
return wrapper
def login_required(func):
func = _login_required(func)
func.requires_login = True
return func
def timed_cache(**timedelta_kwargs):
kwargs = timedelta_kwargs or {"minutes": 10}
ttl = timedelta(**kwargs).total_seconds()
cache = TTLCache(sys.maxsize, ttl)
def make_key(*args, **kwargs):
args = list(args)
args[0] = type(args[0])
return cachetools.keys.hashkey(*args, **kwargs)
def _wrapper(func):
return cachetools.cached(cache, key=make_key)(func)
return _wrapper
def handle_config(cfg=None):
if cfg is None:
if os.path.isfile("config.json"):
with open("config.json") as fh:
cfg=json.load(fh)
with open("config.json", "w") as fh:
json.dump(cfg, fh, indent=4)
return cfg
def with_application_context(app):
def wrapper(func):
@wraps(func)
def wrapped(*args, **kwargs):
with app.app_context():
return func(*args, **kwargs)
return wrapped
return wrapper
def getsize(text, font_size):
font = ImageFont.truetype("arial.ttf", font_size)
return font.getsize_multiline(text)
def does_text_fit(text, width, height, font_size):
w, h = getsize(text, font_size)
return w < width and h < height
def make_placeholder_image(text, width, height, poster=None, wrap=0):
width = int(width)
height = int(height)
wrap = int(wrap)
font_size = 1
bounds = (0, 1)
if wrap:
text = textwrap.fill(text, wrap)
while True:
if not does_text_fit(text, width, height, bounds[1]):
break
bounds = (bounds[1], bounds[1] * 2)
prev_bounds = None
while True:
if does_text_fit(text, width, height, bounds[1]):
bounds = (int(round(sum(bounds) / 2, 0)), bounds[1])
else:
bounds = (bounds[0], int(round(sum(bounds) / 2, 0)))
if prev_bounds == bounds:
break
prev_bounds = bounds
font_size = bounds[0]
io = BytesIO()
im = Image.new("RGBA", (width, height), "#222")
draw = ImageDraw.Draw(im)
font = ImageFont.truetype("arial.ttf", font_size)
w, h = getsize(text, font_size)
if poster:
try:
with urlopen(poster) as fh:
poster = Image.open(fh)
except Exception:
poster = None
else:
poster_size = poster.size
factor = width / poster_size[0]
new_size = (
math.ceil(poster_size[0] * factor),
math.ceil(poster_size[1] * factor),
)
poster = poster.resize(new_size)
mid = -int((poster.size[1] - height) / 2)
im.paste(poster, (0, mid))
draw.text(((width - w) / 2, (height - h) / 2),
text, fill="#eee", font=font)
im.save(io, "PNG")
io.seek(0)
return io
def make_tree(files, child_key="children"):
tree = {}
for file in files:
root = tree
parts = file["name"].split("/")
for item in parts:
if item not in root:
root[item] = {}
prev_root = root
root = root[item]
prev_root[item] = {"__info__": file}
return tree
class BootsrapRenderer(Renderer):
def visit_Navbar(self, node):
sub = []
for item in node.items:
sub.append(self.visit(item))
ret = tags.ul(sub, cls="navbar-nav mr-auto")
return ret
def visit_Text(self, node):
return tags.span(node.text, cls="navbar-text")
def visit_View(self, node):
badge = node.url_for_kwargs.pop("__badge", None)
classes = ["nav-link"]
if hasattr(node, "classes"):
classes = node.classes
if node.active:
classes.append("active")
ret = [tags.a(node.text, href=node.get_url(), cls=" ".join(classes))]
if badge:
ret.insert(
0,
tags.span(
badge[0], cls="badge badge-{} notification-badge".format(badge[1])
),
)
return tags.li(
ret,
cls="nav-item",
)
def visit_Subgroup(self, node):
url = "#"
classes = []
child_active = False
if node.title == "":
active = False
for item in node.items:
if item.active:
classes.append("active")
break
node, *children = node.items
for c in children:
if c.active:
child_active = True
break
node.items = children
node.title = node.text
url = node.get_url()
dropdown = tags.ul(
[
tags.li(
tags.a(
item.text,
href=item.get_url(),
cls="nav-link active" if item.active else "nav-link",
style="",
),
cls="nav-item",
)
for item in node.items
],
cls="dropdown-menu ",
)
link = tags.a(
node.title,
href=url,
cls="nav-link active" if node.active else "nav-link",
style="",
)
toggle = tags.a(
[],
cls="dropdown-toggle nav-link active"
if child_active
else "dropdown-toggle nav-link",
data_toggle="dropdown",
href="#",
style="padding-left: 0px; padding-top: 10px",
)
# almost the same as visit_Navbar, but written a bit more concise
return [link, tags.li([toggle, dropdown], cls="dropdown nav-item")]
def eval_expr(expr, ctx=None):
aeval = asteval.Interpreter(minimal=True, use_numpy=False, symtable=ctx)
return aeval(expr)
def sort_by(values, expr):
return sorted(value, key=lambda v: eval_expr(expr, v))
def genpw(num=20):
return "".join(
random.choice(string.ascii_lowercase + string.ascii_uppercase + string.digits)
for _ in range(num)
)
def populate_form(form, cfg=None):
if cfg is None:
cfg = handle_config()
for name, field in form._fields.items():
field.default = cfg.get(name)
form.transcode_default_profile.choices = [(None, "")]
form.transcode_default_profile.choices += [
(k, k) for k in (cfg.get("transcode_profiles", {}) or {}).keys()
]
def validate_transcoding_profiles(profiles):
for name, data in profiles.items():
for req, req_type in [("command", str), ("doc", str)]:
if req not in data:
raise ValueError(
"Profile '{}' is missing required key '{}'".format(
name, req))
if not isinstance(data[req], req_type):
raise ValueError(
"Key '{}' of profile '{}' should be of type '{}'".format(
req, name, req_type.__name__
)
)
def setup_template_filters(app):
@app.template_filter("flatten")
def flatten(obj, path=None):
path = path or ""
if isinstance(obj, dict):
for k, v in sorted(obj.items()):
yield from flatten(v, "{}.{}".format(path, k))
elif isinstance(obj, list):
for k, v in enumerate(obj):
yield from flatten(v, "{}[{}]".format(path, k))
else:
yield path.lstrip("."), obj
@app.template_filter("defaultattr")
def defaultattr(lst, attr, val):
assert isinstance(lst, list)
for item in lst:
assert isinstance(item, dict)
if attr not in item:
item[attr] = val
return lst
@app.template_filter("pformat")
def t_pformat(o):
return pformat(o)
@app.template_filter("hash")
def t_hash(s):
return hashlib.sha512(bytes(s, "utf-8")).hexdigest()
@app.template_filter()
def regex_replace(s, find, replace):
"""A non-optimal implementation of a regex filter"""
return re.sub(find, replace, s)
@app.template_filter("ctime")
def timectime(s):
return time.ctime(s)
@app.template_filter("ago")
def timeago(s, clamp=False):
seconds = round(time.time() - s, 0)
if clamp:
seconds = max(0, seconds)
return timedelta(seconds=seconds)
@app.template_filter("ago_dt")
def ago_dt(s, rnd=None):
dt = datetime.today() - s
if rnd is not None:
secs = round(dt.total_seconds(), rnd)
dt = timedelta(seconds=secs)
return str(dt).rstrip("0")
@app.template_filter("ago_dt_utc")
def ago_dt_utc(s, rnd=None):
dt = datetime.utcnow() - s
if rnd is not None:
secs = round(dt.total_seconds(), rnd)
dt = timedelta(seconds=secs)
return str(dt).rstrip("0")
@app.template_filter("ago_dt_utc_human")
def ago_dt_utc_human(s, swap=False, rnd=None):
if not swap:
dt = datetime.utcnow() - s
else:
dt = s - datetime.utcnow()
if rnd is not None:
secs = round(dt.total_seconds(), rnd)
dt = timedelta(seconds=secs)
if dt.total_seconds() < 0:
return "In " + str(-dt).rstrip("0")
else:
return str(dt).rstrip("0") + " ago"
@app.template_filter("timedelta")
def time_timedelta(s, digits=None, clamp=False):
if clamp:
s = max(s, 0)
if digits is not None:
s = round(s, digits)
return timedelta(seconds=s)
@app.template_filter("base64")
def jinja_b64(s):
return str(base64.b64encode(bytes(s, "utf8")), "utf8")
@app.template_filter("fromiso")
def time_fromiso(s):
t = s.rstrip("Z").split(".")[0]
t = datetime.strptime(t, "%Y-%m-%dT%H:%M:%S")
try:
t.microsecond = int(s.rstrip("Z").split(".")[1])
except BaseException:
pass
return t
app.add_template_global(urljoin, "urljoin")
@app.template_filter("slugify")
def make_slug(s):
return slugify(s, only_ascii=True)
@app.template_filter("fromjson")
def fromjson(s):
return json.loads(s)
app.template_filter()(make_tree)
app.add_template_global(handle_config, "cfg")