MediaDash/utils.py

409 lines
12 KiB
Python
Raw Normal View History

2021-12-13 18:11:43 +00:00
import base64
import functools
import hashlib
import inspect
import json
2021-08-29 13:03:28 +00:00
import math
2021-12-13 18:11:43 +00:00
import operator as op
import os
2021-08-29 13:03:28 +00:00
import random
2021-12-13 18:11:43 +00:00
import shlex
2021-08-29 13:03:28 +00:00
import string
2021-12-13 18:11:43 +00:00
import subprocess as SP
import sys
import textwrap
import time
from datetime import datetime, timedelta
2021-08-29 13:03:28 +00:00
from functools import wraps
from io import BytesIO
2021-12-13 18:11:43 +00:00
from pprint import pformat
from urllib.parse import quote, unquote_plus, urljoin, urlparse
from urllib.request import urlopen
import asteval
import cachetools
from cachetools import TTLCache
from dominate import tags
from flask import current_app, flash, json, redirect, request
from flask_login import current_user
from flask_login import login_required as _login_required
from flask_nav.renderers import Renderer, SimpleRenderer
from PIL import Image, ImageDraw, ImageFont
from slugify import slugify
def is_safe_url(target):
ref_url = urlparse(request.host_url)
test_url = urlparse(urljoin(request.host_url, target))
return test_url.scheme in (
"http", "https") and ref_url.netloc == test_url.netloc
def admin_required(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
if current_user.is_authenticated and current_user.is_admin:
return func(*args, **kwargs)
if current_user.is_authenticated:
flash("Insufficient permissions!", "error")
return redirect("/")
return current_app.login_manager.unauthorized()
wrapper.requires_admin = True
return wrapper
def login_required(func):
func = _login_required(func)
func.requires_login = True
return func
def timed_cache(**timedelta_kwargs):
kwargs = timedelta_kwargs or {"minutes": 10}
ttl = timedelta(**kwargs).total_seconds()
cache = TTLCache(sys.maxsize, ttl)
def make_key(*args, **kwargs):
args = list(args)
args[0] = type(args[0])
return cachetools.keys.hashkey(*args, **kwargs)
2021-08-29 13:03:28 +00:00
2021-12-13 18:11:43 +00:00
def _wrapper(func):
return cachetools.cached(cache, key=make_key)(func)
return _wrapper
2021-08-29 13:03:28 +00:00
def handle_config(cfg=None):
if cfg is None:
if os.path.isfile("config.json"):
with open("config.json") as fh:
2021-12-13 18:11:43 +00:00
cfg=json.load(fh)
2021-08-29 13:03:28 +00:00
with open("config.json", "w") as fh:
2021-12-13 18:11:43 +00:00
json.dump(cfg, fh, indent=4)
return cfg
2021-08-29 13:03:28 +00:00
def with_application_context(app):
2021-12-13 18:11:43 +00:00
def wrapper(func):
2021-08-29 13:03:28 +00:00
@wraps(func)
2021-12-13 18:11:43 +00:00
def wrapped(*args, **kwargs):
2021-08-29 13:03:28 +00:00
with app.app_context():
return func(*args, **kwargs)
2021-12-13 18:11:43 +00:00
return wrapped
2021-08-29 13:03:28 +00:00
2021-12-13 18:11:43 +00:00
return wrapper
2021-08-29 13:03:28 +00:00
def getsize(text, font_size):
font = ImageFont.truetype("arial.ttf", font_size)
return font.getsize_multiline(text)
def does_text_fit(text, width, height, font_size):
w, h = getsize(text, font_size)
return w < width and h < height
def make_placeholder_image(text, width, height, poster=None, wrap=0):
width = int(width)
height = int(height)
wrap = int(wrap)
font_size = 1
bounds = (0, 1)
if wrap:
text = textwrap.fill(text, wrap)
while True:
if not does_text_fit(text, width, height, bounds[1]):
break
bounds = (bounds[1], bounds[1] * 2)
prev_bounds = None
while True:
if does_text_fit(text, width, height, bounds[1]):
bounds = (int(round(sum(bounds) / 2, 0)), bounds[1])
else:
bounds = (bounds[0], int(round(sum(bounds) / 2, 0)))
if prev_bounds == bounds:
break
prev_bounds = bounds
font_size = bounds[0]
io = BytesIO()
im = Image.new("RGBA", (width, height), "#222")
draw = ImageDraw.Draw(im)
font = ImageFont.truetype("arial.ttf", font_size)
w, h = getsize(text, font_size)
if poster:
try:
with urlopen(poster) as fh:
poster = Image.open(fh)
2021-12-13 18:11:43 +00:00
except Exception:
2021-08-29 13:03:28 +00:00
poster = None
else:
poster_size = poster.size
factor = width / poster_size[0]
new_size = (
math.ceil(poster_size[0] * factor),
math.ceil(poster_size[1] * factor),
)
poster = poster.resize(new_size)
mid = -int((poster.size[1] - height) / 2)
im.paste(poster, (0, mid))
2021-12-13 18:11:43 +00:00
draw.text(((width - w) / 2, (height - h) / 2),
text, fill="#eee", font=font)
2021-08-29 13:03:28 +00:00
im.save(io, "PNG")
io.seek(0)
return io
def make_tree(files, child_key="children"):
tree = {}
for file in files:
root = tree
parts = file["name"].split("/")
for item in parts:
if item not in root:
root[item] = {}
prev_root = root
root = root[item]
prev_root[item] = {"__info__": file}
return tree
class BootsrapRenderer(Renderer):
def visit_Navbar(self, node):
sub = []
for item in node.items:
sub.append(self.visit(item))
ret = tags.ul(sub, cls="navbar-nav mr-auto")
return ret
2021-12-13 18:11:43 +00:00
def visit_Text(self, node):
return tags.span(node.text, cls="navbar-text")
2021-08-29 13:03:28 +00:00
def visit_View(self, node):
2021-12-13 18:11:43 +00:00
badge = node.url_for_kwargs.pop("__badge", None)
2021-08-29 13:03:28 +00:00
classes = ["nav-link"]
2021-12-13 18:11:43 +00:00
if hasattr(node, "classes"):
classes = node.classes
2021-08-29 13:03:28 +00:00
if node.active:
classes.append("active")
2021-12-13 18:11:43 +00:00
ret = [tags.a(node.text, href=node.get_url(), cls=" ".join(classes))]
if badge:
ret.insert(
0,
tags.span(
badge[0], cls="badge badge-{} notification-badge".format(badge[1])
),
)
2021-08-29 13:03:28 +00:00
return tags.li(
2021-12-13 18:11:43 +00:00
ret,
2021-08-29 13:03:28 +00:00
cls="nav-item",
)
def visit_Subgroup(self, node):
url = "#"
classes = []
child_active = False
if node.title == "":
active = False
for item in node.items:
if item.active:
classes.append("active")
break
node, *children = node.items
for c in children:
if c.active:
child_active = True
break
node.items = children
node.title = node.text
url = node.get_url()
dropdown = tags.ul(
[
tags.li(
tags.a(
item.text,
href=item.get_url(),
cls="nav-link active" if item.active else "nav-link",
style="",
),
cls="nav-item",
)
for item in node.items
],
cls="dropdown-menu ",
)
link = tags.a(
node.title,
href=url,
cls="nav-link active" if node.active else "nav-link",
style="",
)
toggle = tags.a(
[],
cls="dropdown-toggle nav-link active"
if child_active
else "dropdown-toggle nav-link",
data_toggle="dropdown",
href="#",
style="padding-left: 0px; padding-top: 10px",
)
# almost the same as visit_Navbar, but written a bit more concise
return [link, tags.li([toggle, dropdown], cls="dropdown nav-item")]
def eval_expr(expr, ctx=None):
aeval = asteval.Interpreter(minimal=True, use_numpy=False, symtable=ctx)
return aeval(expr)
def sort_by(values, expr):
return sorted(value, key=lambda v: eval_expr(expr, v))
2021-12-13 18:11:43 +00:00
2021-08-29 13:03:28 +00:00
def genpw(num=20):
2021-12-13 18:11:43 +00:00
return "".join(
random.choice(string.ascii_lowercase + string.ascii_uppercase + string.digits)
for _ in range(num)
)
def populate_form(form, cfg=None):
if cfg is None:
cfg = handle_config()
for name, field in form._fields.items():
field.default = cfg.get(name)
form.transcode_default_profile.choices = [(None, "")]
form.transcode_default_profile.choices += [
(k, k) for k in (cfg.get("transcode_profiles", {}) or {}).keys()
]
def validate_transcoding_profiles(profiles):
for name, data in profiles.items():
for req, req_type in [("command", str), ("doc", str)]:
if req not in data:
raise ValueError(
"Profile '{}' is missing required key '{}'".format(
name, req))
if not isinstance(data[req], req_type):
raise ValueError(
"Key '{}' of profile '{}' should be of type '{}'".format(
req, name, req_type.__name__
)
)
def setup_template_filters(app):
@app.template_filter("flatten")
def flatten(obj, path=None):
path = path or ""
if isinstance(obj, dict):
for k, v in sorted(obj.items()):
yield from flatten(v, "{}.{}".format(path, k))
elif isinstance(obj, list):
for k, v in enumerate(obj):
yield from flatten(v, "{}[{}]".format(path, k))
else:
yield path.lstrip("."), obj
@app.template_filter("defaultattr")
def defaultattr(lst, attr, val):
assert isinstance(lst, list)
for item in lst:
assert isinstance(item, dict)
if attr not in item:
item[attr] = val
return lst
@app.template_filter("pformat")
def t_pformat(o):
return pformat(o)
@app.template_filter("hash")
def t_hash(s):
return hashlib.sha512(bytes(s, "utf-8")).hexdigest()
@app.template_filter()
def regex_replace(s, find, replace):
"""A non-optimal implementation of a regex filter"""
return re.sub(find, replace, s)
@app.template_filter("ctime")
def timectime(s):
return time.ctime(s)
@app.template_filter("ago")
def timeago(s, clamp=False):
seconds = round(time.time() - s, 0)
if clamp:
seconds = max(0, seconds)
return timedelta(seconds=seconds)
@app.template_filter("ago_dt")
def ago_dt(s, rnd=None):
dt = datetime.today() - s
if rnd is not None:
secs = round(dt.total_seconds(), rnd)
dt = timedelta(seconds=secs)
return str(dt).rstrip("0")
@app.template_filter("ago_dt_utc")
def ago_dt_utc(s, rnd=None):
dt = datetime.utcnow() - s
if rnd is not None:
secs = round(dt.total_seconds(), rnd)
dt = timedelta(seconds=secs)
return str(dt).rstrip("0")
@app.template_filter("ago_dt_utc_human")
def ago_dt_utc_human(s, swap=False, rnd=None):
if not swap:
dt = datetime.utcnow() - s
else:
dt = s - datetime.utcnow()
if rnd is not None:
secs = round(dt.total_seconds(), rnd)
dt = timedelta(seconds=secs)
if dt.total_seconds() < 0:
return "In " + str(-dt).rstrip("0")
else:
return str(dt).rstrip("0") + " ago"
@app.template_filter("timedelta")
def time_timedelta(s, digits=None, clamp=False):
if clamp:
s = max(s, 0)
if digits is not None:
s = round(s, digits)
return timedelta(seconds=s)
@app.template_filter("base64")
def jinja_b64(s):
return str(base64.b64encode(bytes(s, "utf8")), "utf8")
@app.template_filter("fromiso")
def time_fromiso(s):
t = s.rstrip("Z").split(".")[0]
t = datetime.strptime(t, "%Y-%m-%dT%H:%M:%S")
try:
t.microsecond = int(s.rstrip("Z").split(".")[1])
except BaseException:
pass
return t
app.add_template_global(urljoin, "urljoin")
@app.template_filter("slugify")
def make_slug(s):
return slugify(s, only_ascii=True)
@app.template_filter("fromjson")
def fromjson(s):
return json.loads(s)
app.template_filter()(make_tree)
app.add_template_global(handle_config, "cfg")