# SPDX-License-Identifier: BSD-3-Clause # Copyright (c) 2020, The Monero Project. # Copyright (c) 2020, dsc@xmr.pm import re import json import asyncio import os import random from datetime import datetime from collections import Counter from functools import wraps from typing import List, Union from io import BytesIO import psutil import aiohttp from aiohttp_socks import ProxyConnector from PIL import Image import settings RE_ADDRESS = r"^[a-zA-Z0-9]{97}$" def print_banner(): print(f"""\033[91m _______________ |*\_/*|________ | ___________ | .-. .-. ||_/-\_|______ | | | | | .****. .****. | | | | | | 0 0 | | .*****.*****. | | 0 0 | | | | - | | .*********. | | - | | | | \___/ | | .*******. | | \___/ | | | |___ ___| | .*****. | |___________| | |_____|\_/|_____| .***. |_______________| _|__|/ \|_|_.............*.............._|________|_ / ********** \ / ********** \\ / ************ \ / ************ \\ -------------------- {settings.COIN_SYMBOL} --------------------\033[0m """.strip()) def collect_websocket(func): @wraps(func) async def wrapper(*args, **kwargs): from wowlet_backend.factory import connected_websockets queue = asyncio.Queue() connected_websockets.add(queue) try: return await func(queue, *args, **kwargs) finally: connected_websockets.remove(queue) return wrapper async def httpget(url: str, json=True, timeout: int = 5, socks5: str = None, raise_for_status=True, verify_tls=True): headers = {"User-Agent": random_agent()} opts = {"timeout": aiohttp.ClientTimeout(total=timeout)} if socks5: opts['connector'] = ProxyConnector.from_url(socks5) async with aiohttp.ClientSession(**opts) as session: async with session.get(url, headers=headers, ssl=verify_tls) as response: if raise_for_status: response.raise_for_status() result = await response.json() if json else await response.text() if result is None or (isinstance(result, str) and result == ''): raise Exception("empty response from request") return result def random_agent(): from wowlet_backend.factory import user_agents return random.choice(user_agents) async def feather_data(): """A collection of data collected by `FeatherTask`, for Feather wallet clients.""" from wowlet_backend.factory import cache, now data = await cache.get("data") if data: data = json.loads(data) return data keys = ["blockheights", "funding_proposals", "crypto_rates", "fiat_rates", "reddit", "rpc_nodes", "xmrig", "xmrto_rates", "suchwow", "forum", "wowlet_releases"] data = {keys[i]: json.loads(val) if val else None for i, val in enumerate(await cache.mget(*keys))} # @TODO: for backward-compat reasons we're including some legacy keys which can be removed after 1.0 release data['nodes'] = data['rpc_nodes'] data['ccs'] = data['funding_proposals'] data['wfs'] = data['funding_proposals'] # start caching when application lifetime is more than 20 seconds if (datetime.now() - now).total_seconds() > 20: await cache.setex("data", 30, json.dumps(data)) return data def popularity_contest(lst: List[int]) -> Union[int, None]: """Return most common occurrences of List[int]. If there are no duplicates, return max() instead. """ if not lst: return if len(set(lst)) == len(lst): return max(lst) return Counter(lst).most_common(1)[0][0] def current_worker_thread_is_primary() -> bool: """ ASGI server (Hypercorn) may start multiple worker threads, but we only want one feather-ws instance to schedule `FeatherTask` tasks at an interval. Therefor this function determines if the current instance is responsible for the recurring Feather tasks. """ from wowlet_backend.factory import app current_pid = os.getpid() parent_pid = os.getppid() app.logger.debug(f"current_pid: {current_pid}, " f"parent_pid: {parent_pid}") if parent_pid == 0: return True parent = psutil.Process(parent_pid) if parent.name() != "hypercorn": return True lowest_pid = min(c.pid for c in parent.children(recursive=True) if c.name() == "hypercorn") if current_pid == lowest_pid: return True async def image_resize(buffer: bytes, max_bounding_box: int = 512, quality: int = 70) -> bytes: """ - Resize if the image is too large - PNG -> JPEG - Removes EXIF """ buffer = BytesIO(buffer) buffer.seek(0) image = Image.open(buffer) image = image.convert('RGB') if max([image.height, image.width]) > max_bounding_box: image.thumbnail((max_bounding_box, max_bounding_box), Image.BICUBIC) data = list(image.getdata()) image_without_exif = Image.new(image.mode, image.size) image_without_exif.putdata(data) buffer = BytesIO() image_without_exif.save(buffer, "JPEG", quality=quality) buffer.seek(0) return buffer.read() async def whaleornot(amount: float): if amount <= 0: fish_str = "amoeba" elif amount < 100: fish_str = "plankton" elif amount >= 100 and amount < 200: fish_str = "Paedocypris" elif amount >= 200 and amount < 500: fish_str = "Dwarf Goby" elif amount >= 500 and amount < 1000: fish_str = "European Pilchard" elif amount >= 1000 and amount < 2000: fish_str = "Goldfish" elif amount >= 2000 and amount < 4000: fish_str = "Herring" elif amount >= 4000 and amount < 7000: fish_str = "Atlantic Macerel" elif amount >= 7000 and amount < 9000: fish_str = "Gilt-head Bream" elif amount >= 9000 and amount < 12000: fish_str = "Salmonidae" elif amount >= 12000 and amount < 20000: fish_str = "Gadidae" elif amount >= 20000 and amount < 40000: fish_str = "Norwegian Delicious Salmon" elif amount >= 40000 and amount < 60000: fish_str = "Electric eel" elif amount >= 60000 and amount < 80000: fish_str = "Tuna" elif amount >= 80000 and amount < 100000: fish_str = "Wels catfish" elif amount >= 100000 and amount < 120000: fish_str = "Black marlin" elif amount >= 120000 and amount < 160000: fish_str = "Shark" elif amount >= 160000 and amount < 220000: fish_str = "Dolphin" elif amount >= 220000 and amount < 320000: fish_str = "Narwhal" elif amount >= 320000 and amount < 500000: fish_str = "Orca" elif amount >= 500000 and amount < 700000: fish_str = "Blue Whale" elif amount >= 700000 and amount < 1000000: fish_str = "Leviathan" else: fish_str = "Cthulu" return fish_str