From 42bb0c832e6234e373d12824a0b5da9504dd1c61 Mon Sep 17 00:00:00 2001 From: dsc Date: Tue, 22 Dec 2020 19:03:48 +0100 Subject: [PATCH] Feather-ws rewrite; - Move recurring tasks into their own class; inherits from `FeatherTask` - CCS proposals: Don't use API, it's broken - webcrawl instead until it is fixed. - Switch to hypercorn as the ASGI server, *with* support for multiple workers. You can now run feather-ws with, for example, `--workers 6`. See `Dockerfile`. - Introduce support for various coins under `BlockheightTask` - Introduce support for various Reddit communities under `RedditTask` - Introduced weightvoting whilst validating third-party RPC blockheights - where nodes are filtered based on what other nodes are commonly reporting. - Current blockheights are fetched from various block explorers and weightvoting is done to eliminate outliers under `BlockheightTask`. - Don't filter/remove bad nodes from the rpc_nodes list; correctly label them as disabled/bad nodes. - Multiple Feather instances (each for it's own coin) can now run on one machine, using only one Redis instance, as each coins has it's own Redis database index. - Configuration options inside `settings.py` can now be controlled via environment variables. - Better logging through custom log formatting and correct usage of `app.logger.*` - Fixed a bug where one task could overlap with itself if the previous one did not finish yet. This was particularly noticable inside the `RPCNodeCheckTask` where the high timeout (for Tor nodes) could cause the task to run *longer* than the recurring task interval. - Introduced a `docker-compose.yml` to combine the Feather container with Redis and Tor containers. - Blocking IO operations are now done via `aiofiles` --- Dockerfile | 3 +- README.md | 71 ++++--- asgi.py | 6 + data/.gitkeep | 0 data/nodes.json | 98 +++++---- docker-compose.yml | 27 +++ fapi/factory.py | 118 +++++++---- fapi/fapi.py | 349 -------------------------------- fapi/routes.py | 23 +-- fapi/tasks/__init__.py | 160 +++++++++++++++ fapi/tasks/blockheight.py | 161 +++++++++++++++ fapi/tasks/historical_prices.py | 113 +++++++++++ fapi/tasks/proposals.py | 138 +++++++++++++ fapi/tasks/rates_crypto.py | 59 ++++++ fapi/tasks/rates_fiat.py | 23 +++ fapi/tasks/reddit.py | 56 +++++ fapi/tasks/rpc_nodes.py | 117 +++++++++++ fapi/tasks/xmrig.py | 58 ++++++ fapi/tasks/xmrto.py | 26 +++ fapi/utils.py | 255 ++++++++--------------- fapi/wsparse.py | 4 +- requirements.txt | 4 +- run.py | 2 +- settings.py_example | 50 ++--- 24 files changed, 1250 insertions(+), 671 deletions(-) create mode 100644 asgi.py delete mode 100644 data/.gitkeep create mode 100644 docker-compose.yml delete mode 100644 fapi/fapi.py create mode 100644 fapi/tasks/__init__.py create mode 100644 fapi/tasks/blockheight.py create mode 100644 fapi/tasks/historical_prices.py create mode 100644 fapi/tasks/proposals.py create mode 100644 fapi/tasks/rates_crypto.py create mode 100644 fapi/tasks/rates_fiat.py create mode 100644 fapi/tasks/reddit.py create mode 100644 fapi/tasks/rpc_nodes.py create mode 100644 fapi/tasks/xmrig.py create mode 100644 fapi/tasks/xmrto.py diff --git a/Dockerfile b/Dockerfile index 98c65f2..09049bf 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,9 +1,10 @@ FROM python:3.7 +WORKDIR /app COPY requirements.txt . RUN pip install -r requirements.txt COPY . . EXPOSE 1337 -CMD ["python3", "-u", "run.py"] \ No newline at end of file +CMD ["hypercorn", "--access-logfile", "-", "--workers", "1", "--bind", "0.0.0.0:18200", "asgi:app"] \ No newline at end of file diff --git a/README.md b/README.md index 66fb6fe..63bf823 100644 --- a/README.md +++ b/README.md @@ -1,34 +1,55 @@ # feather-ws -This is the back-end websocket server for Feather wallet. +Back-end websocket server for Feather wallet. -- Python 3 asyncio -- Quart web framework +- Quart web framework, Py3 asyncio - Redis +## Coins supported -### Supervisor +- Monero +- Wownero -Example config. +See also the environment variables `FEATHER_COIN_NAME`, `FEATHER_COIN_SYMBOL`, etc. in `settings.py`. + +## Tasks + +This websocket server has several scheduled recurring tasks: + +- Fetch latest blockheight from various block explorers +- Fetch crypto/fiat exchange rates +- Fetch latest Reddit posts +- Fetch funding proposals +- Check status of RPC nodes (`data/nodes.json`) + +When Feather wallet starts up, it will connect to +this websocket server and receive the information +listed above which is necessary for normal operation. + +See `fapi.tasks.*` for the various tasks. + +## Development + +Requires Python 3.7 and higher. -```text -[program:ws] -directory=/home/feather/feather-ws -command=/home/feather/feather-ws/venv/bin/python run.py -autostart=true -autorestart=true -startsecs=6 -stdout_logfile=/home/feather/feather-ws/stdout.log -stdout_logfile_maxbytes=1MB -stdout_logfile_backups=10 -stdout_capture_maxbytes=1MB -stderr_logfile=/home/feather/feather-ws/stderr.log -stderr_logfile_maxbytes=1MB -stderr_logfile_backups=10 -stderr_capture_maxbytes=1MB -user = feather -environment= - HOME="/home/feather", - USER="feather", - PATH="/home/feather/feather-ws/venv/bin" ``` +virtualenv -p /usr/bin/python3 venv +source venv/bin/activate +pip install -r requirements.txt + +export FEATHER_DEBUG=true +python run.py +``` + +Note that `run.py` is meant as a development server. For production, +use `asgi.py` with something like hypercorn. + +## Docker + +In production you may run via docker; + +``` +docker-compose up +``` + +Will bind on `http://127.0.0.1:1337`. Modify `docker-compose.yml` if necessary. diff --git a/asgi.py b/asgi.py new file mode 100644 index 0000000..50bfc10 --- /dev/null +++ b/asgi.py @@ -0,0 +1,6 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2020, The Monero Project. +# Copyright (c) 2020, dsc@xmr.pm + +from fapi.factory import create_app +app = create_app() diff --git a/data/.gitkeep b/data/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/data/nodes.json b/data/nodes.json index 940f145..c668037 100644 --- a/data/nodes.json +++ b/data/nodes.json @@ -1,43 +1,63 @@ { - "mainnet": { - "tor": [ - "fdlnlt5mr5o7lmhg.onion:18081", - "xmkwypann4ly64gh.onion:18081", - "xmrtolujkxnlinre.onion:18081", - "xmrag4hf5xlabmob.onion:18081", - "monero26mmldsallmxok2kwamne4ve3mybvvn2yijsvss7ey63hc4yyd.onion:18081", - "nrw57zxw5zyevn3i.onion:18081", - "monero5sjoz5xmjn.onion:18081", - "mxcd4577fldb3ppzy7obmmhnu3tf57gbcbd4qhwr2kxyjj2qi3dnbfqd.onion:18081", - "moneroxmrxw44lku6qniyarpwgznpcwml4drq7vb24ppatlcg4kmxpqd.onion:18089", - "moneroptqodufzxj.onion:18081", - "3hvpnd4xejtzcuowvru2wfjum5wjf7synigm44rrizr3k4v5vzam2bad.onion:18081", - "6dsdenp6vjkvqzy4wzsnzn6wixkdzihx3khiumyzieauxuxslmcaeiad.onion:18081", - "3t7v5zpcfxq2tocdofdcwxgrldco3elotz3iis4jtbbnscy5alezw7yd.onion:18081" - ], - "clearnet": [ - "eu-west.node.xmr.pm:18089", - "eu-west-2.node.xmr.pm:18089", - "usa-east-va.node.xmr.pm:18089", - "canada.node.xmr.pm:18089", - "singapore.node.xmr.pm:18089", - "192.110.160.146:18089", - "nodes.hashvault.pro:18081", - "node.supportxmr.com:18081", - "node.imonero.org:18081", - "xmr-node-eu.cakewallet.com:18081", - "xmr-node-usa-east.cakewallet.com:18081", - "node.xmr.pt:18081", - "node.xmr.ru:18081", - "xmr-peer-070.cypherpunklabs.com:18081", - "xmr.fail:18081" - ] + "xmr": { + "mainnet": { + "tor": [ + "fdlnlt5mr5o7lmhg.onion:18081", + "xmkwypann4ly64gh.onion:18081", + "xmrtolujkxnlinre.onion:18081", + "xmrag4hf5xlabmob.onion:18081", + "monero26mmldsallmxok2kwamne4ve3mybvvn2yijsvss7ey63hc4yyd.onion:18081", + "nrw57zxw5zyevn3i.onion:18081", + "monero5sjoz5xmjn.onion:18081", + "mxcd4577fldb3ppzy7obmmhnu3tf57gbcbd4qhwr2kxyjj2qi3dnbfqd.onion:18081", + "moneroxmrxw44lku6qniyarpwgznpcwml4drq7vb24ppatlcg4kmxpqd.onion:18089", + "moneroptqodufzxj.onion:18081", + "3hvpnd4xejtzcuowvru2wfjum5wjf7synigm44rrizr3k4v5vzam2bad.onion:18081", + "6dsdenp6vjkvqzy4wzsnzn6wixkdzihx3khiumyzieauxuxslmcaeiad.onion:18081", + "3t7v5zpcfxq2tocdofdcwxgrldco3elotz3iis4jtbbnscy5alezw7yd.onion:18081" + ], + "clearnet": [ + "eu-west.node.xmr.pm:18089", + "eu-west-2.node.xmr.pm:18089", + "usa-east-va.node.xmr.pm:18089", + "canada.node.xmr.pm:18089", + "singapore.node.xmr.pm:18089", + "192.110.160.146:18089", + "nodes.hashvault.pro:18081", + "node.supportxmr.com:18081", + "node.imonero.org:18081", + "xmr-node-eu.cakewallet.com:18081", + "xmr-node-usa-east.cakewallet.com:18081", + "node.xmr.pt:18081", + "node.xmr.ru:18081", + "xmr-peer-070.cypherpunklabs.com:18081", + "xmr.fail:18081" + ] + }, + "stagenet": { + "tor": [], + "clearnet": [ + "run.your.own.node.xmr.pm:38089", + "super.fast.node.xmr.pm:38089" + ] + } }, - "stagenet": { - "tor": [], - "clearnet": [ - "run.your.own.node.xmr.pm:38089", - "super.fast.node.xmr.pm:38089" - ] + "wow": { + "mainnet": { + "tor": [ + "wowbuxx535x4exuexja2xfezpwcyznxkofui4ndjiectj4yuh2xheiid.onion:34568" + ], + "clearnet": [ + "wow.pwned.systems:34568", + "global.wownodes.com:34568", + "node.suchwow.xyz:34568", + "super.fast.node.xmr.pm:34568", + "wowbux.org:34568" + ] + }, + "stagenet": { + "tor": [], + "clearnet": [] + } } } diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..76e7e8f --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,27 @@ +version: "3" + +services: + redis: + container_name: redis + image: "redis:alpine" + command: redis-server + environment: + - REDIS_REPLICATION_MODE=master + tor-node: + image: osminogin/tor-simple + restart: always + feather-ws: + container_name: feather-ws + build: + context: . + dockerfile: Dockerfile + environment: + - FEATHER_DEBUG=false + - FEATHER_PORT=1337 + - FEATHER_REDIS_ADDRESS=redis://redis + - FEATHER_TOR_SOCKS_PROXY=socks5://tor-node:9050 + - FEATHER_COIN_NAME=monero + - FEATHER_COIN_SYMBOL=xmr + - FEATHER_COIN_MODE=mainnet + ports: + - "1337:1337" diff --git a/fapi/factory.py b/fapi/factory.py index 269122d..096b89f 100644 --- a/fapi/factory.py +++ b/fapi/factory.py @@ -4,77 +4,113 @@ import json import asyncio +from typing import List, Set +from datetime import datetime from quart import Quart from quart_session import Session import aioredis +from fapi.utils import current_worker_thread_is_primary, print_banner import settings -app = None +now = datetime.now() +app: Quart = None cache = None -connected_websockets = set() -api_data = {} -user_agents = None -txfiatdb = None +rpc_nodes: dict = None +user_agents: List[str] = None +connected_websockets: Set[asyncio.Queue] = set() +_is_primary_worker_thread = False -print("""\033[91m - █████▒▓█████ ▄▄▄ ▄▄▄█████▓ ██░ ██ ▓█████ ██▀███ -▓██ ▒ ▓█ ▀▒████▄ ▓ ██▒ ▓▒▓██░ ██▒▓█ ▀ ▓██ ▒ ██▒ -▒████ ░ ▒███ ▒██ ▀█▄ ▒ ▓██░ ▒░▒██▀▀██░▒███ ▓██ ░▄█ ▒ -░▓█▒ ░ ▒▓█ ▄░██▄▄▄▄██░ ▓██▓ ░ ░▓█ ░██ ▒▓█ ▄ ▒██▀▀█▄ -░▒█░ ░▒████▒▓█ ▓██▒ ▒██▒ ░ ░▓█▒░██▓░▒████▒░██▓ ▒██▒ - ▒ ░ ░░ ▒░ ░▒▒ ▓▒█░ ▒ ░░ ▒ ░░▒░▒░░ ▒░ ░░ ▒▓ ░▒▓░ - ░ ░ ░ ░ ▒ ▒▒ ░ ░ ▒ ░▒░ ░ ░ ░ ░ ░▒ ░ ▒░ - ░ ░ ░ ░ ▒ ░ ░ ░░ ░ ░ ░░ ░ - ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ \033[0m -""".strip()) + +async def _setup_nodes(app: Quart): + global rpc_nodes + with open("data/nodes.json", "r") as f: + rpc_nodes = json.loads(f.read()).get(settings.COIN_SYMBOL) + + +async def _setup_user_agents(app: Quart): + global user_agents + with open('data/user_agents.txt', 'r') as f: + user_agents = [l.strip() for l in f.readlines() if l.strip()] async def _setup_cache(app: Quart): global cache + # Each coin has it's own Redis DB index; `redis-cli -n $INDEX` + db = {"xmr": 0, "wow": 1, "aeon": 2, "trtl": 3, "msr": 4, "xhv": 5, "loki": 6}[settings.COIN_SYMBOL] data = { - "address": settings.redis_address + "address": settings.REDIS_ADDRESS, + "db": db, + "password": settings.REDIS_PASSWORD if settings.REDIS_PASSWORD else None } - if settings.redis_password: - data['password'] = settings.redis_password - cache = await aioredis.create_redis_pool(**data) app.config['SESSION_TYPE'] = 'redis' app.config['SESSION_REDIS'] = cache Session(app) +async def _setup_tasks(app: Quart): + """Schedules a series of tasks at an interval.""" + if not _is_primary_worker_thread: + return + + from fapi.tasks import ( + BlockheightTask, HistoricalPriceTask, FundingProposalsTask, + CryptoRatesTask, FiatRatesTask, RedditTask, RPCNodeCheckTask, + XmrigTask, XmrToTask) + + asyncio.create_task(BlockheightTask().start()) + asyncio.create_task(HistoricalPriceTask().start()) + asyncio.create_task(CryptoRatesTask().start()) + asyncio.create_task(FiatRatesTask().start()) + asyncio.create_task(RedditTask().start()) + asyncio.create_task(RPCNodeCheckTask().start()) + asyncio.create_task(XmrigTask().start()) + + if settings.COIN_SYMBOL in ["xmr", "wow"]: + asyncio.create_task(FundingProposalsTask().start()) + + if settings.COIN_SYMBOL == "xmr": + asyncio.create_task(XmrToTask().start()) + + +def _setup_logging(): + from logging import Formatter + from logging.config import dictConfig + from quart.logging import default_handler + default_handler.setFormatter(Formatter('[%(asctime)s] %(levelname)s in %(funcName)s(): %(message)s (%(pathname)s)')) + + dictConfig({ + 'version': 1, + 'loggers': { + 'quart.app': { + 'level': 'DEBUG' if settings.DEBUG else 'INFO', + }, + }, + }) + + def create_app(): global app + + _setup_logging() app = Quart(__name__) @app.before_serving async def startup(): - global txfiatdb, user_agents + global _is_primary_worker_thread + _is_primary_worker_thread = current_worker_thread_is_primary() + + if _is_primary_worker_thread: + print_banner() + await _setup_cache(app) - loop = asyncio.get_event_loop() + await _setup_nodes(app) + await _setup_user_agents(app) + await _setup_tasks(app) - with open('data/nodes.json', 'r') as f: - nodes = json.loads(f.read()) - cache.execute('JSON.SET', 'nodes', '.', json.dumps(nodes)) - - with open('data/user_agents.txt', 'r') as f: - user_agents = [l.strip() for l in f.readlines() if l.strip()] - - from fapi.fapi import FeatherApi - from fapi.utils import loopyloop, TxFiatDb, XmrRig - txfiatdb = TxFiatDb(settings.crypto_name, settings.crypto_block_date_start) - loop.create_task(loopyloop(20, FeatherApi.xmrto_rates, FeatherApi.after_xmrto)) - loop.create_task(loopyloop(120, FeatherApi.crypto_rates, FeatherApi.after_crypto)) - loop.create_task(loopyloop(600, FeatherApi.fiat_rates, FeatherApi.after_fiat)) - loop.create_task(loopyloop(300, FeatherApi.ccs, FeatherApi.after_ccs)) - loop.create_task(loopyloop(900, FeatherApi.reddit, FeatherApi.after_reddit)) - loop.create_task(loopyloop(60, FeatherApi.blockheight, FeatherApi.after_blockheight)) - loop.create_task(loopyloop(60, FeatherApi.check_nodes, FeatherApi.after_check_nodes)) - loop.create_task(loopyloop(43200, txfiatdb.update)) - loop.create_task(loopyloop(43200, XmrRig.releases, XmrRig.after_releases)) import fapi.routes return app diff --git a/fapi/fapi.py b/fapi/fapi.py deleted file mode 100644 index f592de1..0000000 --- a/fapi/fapi.py +++ /dev/null @@ -1,349 +0,0 @@ -# SPDX-License-Identifier: BSD-3-Clause -# Copyright (c) 2020, The Monero Project. -# Copyright (c) 2020, dsc@xmr.pm - -import json - -import aiohttp -from bs4 import BeautifulSoup -from aiohttp_socks import ProxyType, ProxyConnector, ChainProxyConnector -from fapi.utils import broadcast_blockheight, broadcast_nodes, httpget, BlockHeight - -import settings - - -class FeatherApi: - @staticmethod - async def redis_get(key): - from fapi.factory import app, cache - try: - data = await cache.get(key) - if data: - return json.loads(data) - except Exception as ex: - app.logger.error(f"Redis error: {ex}") - - @staticmethod - async def redis_json_get(key, path="."): - from fapi.factory import app, cache - try: - data = await cache.execute('JSON.GET', key, path) - if data: - return json.loads(data) - except Exception as ex: - app.logger.error(f"Redis error: {ex}") - - @staticmethod - async def xmrto_rates(): - from fapi.factory import app, cache - xmrto_rates = await FeatherApi.redis_get("xmrto_rates") - if xmrto_rates and app.config["DEBUG"]: - return xmrto_rates - - try: - result = await httpget(settings.urls["xmrto_rates"]) - if not result: - raise Exception("empty response") - if "error" in result: - raise Exception(f"${result['error']} ${result['error_msg']}") - return result - except Exception as ex: - app.logger.error(f"error parsing xmrto_rates blob: {ex}") - return xmrto_rates - - @staticmethod - async def after_xmrto(data): - from fapi.factory import app, cache, api_data, connected_websockets - if not data: - return - - _data = api_data.get("xmrto_rates", {}) - _data = json.dumps(_data, sort_keys=True, indent=4) - if json.dumps(data, sort_keys=True, indent=4) == _data: - return - - api_data["xmrto_rates"] = data - - @staticmethod - async def crypto_rates(): - from fapi.factory import app, cache - crypto_rates = await FeatherApi.redis_get("crypto_rates") - if crypto_rates and app.config["DEBUG"]: - return crypto_rates - - result = None - try: - result = await httpget(settings.urls["crypto_rates"]) - if not result: - raise Exception("empty response") - crypto_rates = result - except Exception as ex: - app.logger.error(f"error parsing crypto_rates blob: {ex}") - - if not result and crypto_rates: - app.logger.warning("USING OLD CACHE FOR CRYPTO RATES") - return crypto_rates - - # grab WOW price while we're at it... - - try: - _result = await httpget(settings.urls["crypto_wow_rates"]) - if not _result: - raise Exception("empty response") - except Exception as ex: - _result = {} - if "wownero" in _result and "usd" in _result["wownero"]: - crypto_rates.append({ - "id": "wownero", - "symbol": "wow", - "image": "", - "name": "Wownero", - "current_price": _result["wownero"]["usd"], - "price_change_percentage_24h": 0.0 - }) - - await cache.set("crypto_rates", json.dumps(crypto_rates)) - return crypto_rates - - @staticmethod - async def after_crypto(data): - from fapi.factory import app, cache, api_data, connected_websockets - if not data: - return - - _data = api_data.get("crypto_rates", {}) - _data = json.dumps(_data, sort_keys=True, indent=4) - if json.dumps(data, sort_keys=True, indent=4) == _data: - return - - _data = [] - for obj in data: - _data.append({ - "id": obj['id'], - "symbol": obj['symbol'], - "image": obj['image'], - "name": obj['name'], - "current_price": obj['current_price'], - "price_change_percentage_24h": obj['price_change_percentage_24h'] - }) - - api_data["crypto_rates"] = data - for queue in connected_websockets: - await queue.put({ - "cmd": "crypto_rates", - "data": { - "crypto_rates": api_data["crypto_rates"] - } - }) - - @staticmethod - async def fiat_rates(): - from fapi.factory import app, cache - fiat_rates = await FeatherApi.redis_get("fiat_rates") - if fiat_rates and app.config["DEBUG"]: - return fiat_rates - - try: - result = await httpget(settings.urls["fiat_rates"], json=True) - if not result: - raise Exception("empty response") - await cache.set("fiat_rates", json.dumps(result)) - return result - except Exception as ex: - app.logger.error(f"error parsing fiat_rates blob: {ex}") - - # old cache - app.logger.warning("USING OLD CACHE FOR FIAT RATES") - return fiat_rates - - @staticmethod - async def after_fiat(data): - from fapi.factory import app, cache, api_data, connected_websockets - if not data: - return - - _data = api_data.get("fiat_rates", {}) - _data = json.dumps(_data, sort_keys=True, indent=4) - if json.dumps(data, sort_keys=True, indent=4) == _data: - return - - api_data["fiat_rates"] = data - for queue in connected_websockets: - await queue.put({ - "cmd": "fiat_rates", - "data": { - "fiat_rates": api_data["fiat_rates"] - } - }) - - @staticmethod - async def ccs(): - from fapi.factory import app, cache - ccs = await FeatherApi.redis_get("ccs") - if ccs and app.config["DEBUG"]: - return ccs - - try: - content = await httpget(f"https://ccs.getmonero.org/index.php/projects", json=True) - - data = [p for p in content["data"] if p["state"] == "FUNDING-REQUIRED" and p['address'] != '8Bok6rt3aCYE41d3YxfMfpSBD6rMDeV9cchSM99KwPFi5GHXe28pHXcYzqtej52TQJT4M8zhfyaoCXDoioR7nSfpC7St48K'] - for p in data: - p.update({"url": settings.urls['ccs']+'/funding-required/'}) - - await cache.set("ccs", json.dumps(data)) - return data - except Exception as ex: - app.logger.error(f"Error parsing CCS data: {ex}") - - return ccs - - @staticmethod - async def after_ccs(data): - from fapi.factory import app, cache, api_data, connected_websockets - if not data: - return - - _data = api_data.get("ccs", {}) - _data = json.dumps(_data, sort_keys=True, indent=4) - if json.dumps(data, sort_keys=True, indent=4) == _data: - return - - api_data["ccs"] = data - for queue in connected_websockets: - await queue.put({ - "cmd": "ccs", - "data": api_data["ccs"] - }) - - @staticmethod - async def reddit(): - from fapi.factory import app, cache - reddit = await FeatherApi.redis_get("reddit") - if reddit and app.config["DEBUG"]: - return reddit - - try: - blob = await httpget(settings.urls["reddit"]) - if not blob: - raise Exception("no data from url") - blob = [{ - 'title': z['data']['title'], - 'author': z['data']['author'], - 'url': "https://old.reddit.com" + z['data']['permalink'], - 'comments': z['data']['num_comments'] - } for z in blob['data']['children']] - - # success - if blob: - await cache.set("reddit", json.dumps(blob)) - return blob - except Exception as ex: - app.logger.error(f"error parsing reddit blob: {ex}") - - # old cache - return reddit - - @staticmethod - async def after_reddit(data): - from fapi.factory import app, cache, api_data, connected_websockets - if not data: - return - - _data = api_data.get("reddit", {}) - _data = json.dumps(_data, sort_keys=True, indent=4) - if json.dumps(data, sort_keys=True, indent=4) == _data: - return - - api_data["reddit"] = data - for queue in connected_websockets: - await queue.put({ - "cmd": "reddit", - "data": api_data["reddit"] - }) - - @staticmethod - async def blockheight(): - from fapi.factory import app, cache - data = {"mainnet": 0, "stagenet": 0} - - for stagenet in [False, True]: - try: - data["mainnet" if stagenet is False else "stagenet"] = \ - await BlockHeight.xmrchain(stagenet) - except Exception as ex: - app.logger.error(f"Could not fetch blockheight from xmrchain") - try: - data["mainnet" if stagenet is False else "stagenet"] = \ - await BlockHeight.xmrto(stagenet) - except: - app.logger.error(f"Could not fetch blockheight from xmr.to") - return data - - @staticmethod - async def after_blockheight(data): - from fapi.factory import app, cache, api_data - - changed = False - api_data.setdefault("blockheights", {}) - if data["mainnet"] > 1 and data["mainnet"] > api_data["blockheights"].get("mainnet", 1): - api_data["blockheights"]["mainnet"] = data["mainnet"] - changed = True - if data["stagenet"] > 1 and data["stagenet"] > api_data["blockheights"].get("stagenet", 1): - api_data["blockheights"]["stagenet"] = data["stagenet"] - changed = True - - if changed: - await broadcast_blockheight() - - @staticmethod - async def check_nodes(): - from fapi.factory import app - - nodes = await FeatherApi.redis_json_get("nodes") - - data = [] - for network_type, network_name in nodes.items(): - for k, _nodes in nodes[network_type].items(): - for node in _nodes: - timeout = aiohttp.ClientTimeout(total=5) - d = {'timeout': timeout} - if ".onion" in node: - d['connector'] = ProxyConnector.from_url(settings.tor_socks) - d['timeout'] = aiohttp.ClientTimeout(total=12) - try: - async with aiohttp.ClientSession(**d) as session: - async with session.get(f"http://{node}/get_info") as response: - blob = await response.json() - for expect in ["nettype", "height", "target_height"]: - assert expect in blob - _node = { - "address": node, - "height": int(blob["height"]), - "target_height": int(blob["target_height"]), - "online": True, - "nettype": blob["nettype"], - "type": k - } - - # Filter out nodes affected by < v0.17.1.3 sybil attack - if _node['target_height'] > _node["height"]: - continue - - except Exception as ex: - app.logger.warning(f"node {node} not reachable") - _node = { - "address": node, - "height": 0, - "target_height": 0, - "online": False, - "nettype": network_type, - "type": k - } - data.append(_node) - return data - - @staticmethod - async def after_check_nodes(data): - from fapi.factory import api_data - api_data["nodes"] = data - await broadcast_nodes() diff --git a/fapi/routes.py b/fapi/routes.py index 752523a..26464e0 100644 --- a/fapi/routes.py +++ b/fapi/routes.py @@ -4,39 +4,36 @@ import asyncio import json -from copy import deepcopy from quart import websocket, jsonify from fapi.factory import app from fapi.wsparse import WebsocketParse -from fapi.utils import collect_websocket +from fapi.utils import collect_websocket, feather_data @app.route("/") async def root(): - from fapi.factory import api_data - return jsonify(api_data) + data = await feather_data() + return jsonify(data) @app.websocket('/ws') @collect_websocket async def ws(queue): - from fapi.factory import api_data + data = await feather_data() - # blast data on connect - _api_data = deepcopy(api_data) # prevent race condition - for k, v in _api_data.items(): - if not v: + # blast available data on connect + for task_key, task_value in data.items(): + if not task_value: continue - await websocket.send(json.dumps({"cmd": k, "data": v}).encode()) - _api_data = None + await websocket.send(json.dumps({"cmd": task_key, "data": task_value}).encode()) async def rx(): while True: - data = await websocket.receive() + buffer = await websocket.receive() try: - blob = json.loads(data) + blob = json.loads(buffer) if "cmd" not in blob: continue cmd = blob.get('cmd') diff --git a/fapi/tasks/__init__.py b/fapi/tasks/__init__.py new file mode 100644 index 0000000..ef76b14 --- /dev/null +++ b/fapi/tasks/__init__.py @@ -0,0 +1,160 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2020, The Monero Project. +# Copyright (c) 2020, dsc@xmr.pm + +import json +import asyncio +import random +from typing import Union + + +class FeatherTask: + """ + The base class of many recurring tasks for this + project. This abstracts away some functionality: + + 1. Tasks are automatically cached in Redis if the `_cache_key` is set. + 2. The task result is propagated to connected websocket clients if + `_websocket_cmd` is set. + 3. Inheritors should implement the `task()` method. + 4. Inheritors can optionally implement the `done()` method. + """ + def __init__(self, interval: int): + """ + :param interval: secs + """ + self.interval = interval + + # propogate to websocket clients? + self._websocket_cmd: str = None + + # redis + self._cache_key: str = None + self._cache_expiry: int = None + + # logging + self._qualname: str = f"{self.__class__.__module__}.{self.__class__.__name__}" + + self._active = True + self._running = False + + async def start(self, *args, **kwargs): + from fapi.factory import app, connected_websockets + if not self._active: + # invalid task + return + + app.logger.info(f"Starting task {self._qualname}") + sleep = lambda: asyncio.sleep(random.randrange(self.interval - 5, + self.interval + 5)) + while True: + if not self._active: + # invalid task + return + + if self._running: + # task already running, wait for completion + await asyncio.sleep(5) + continue + + try: + self._running = True + result: dict = await self.task(*args, **kwargs) + if not result: + raise Exception("No result") + except Exception as ex: + app.logger.error(f"{self._qualname} - {ex}") + + # if the task failed we can attempt to use an old value from the cache. + if not self._cache_key: + app.logger.warning(f"{self._qualname} - No cache key for task, skipping") + await sleep() + self._running = False + continue + + app.logger.info(f"{self._qualname} - trying cache") + result = await self.cache_get(self._cache_key) + if result: + app.logger.warning(f"serving cached result for {self._qualname}") + else: + app.logger.error(f"{self._qualname} - cache lookup failed, fix me") + await sleep() + self._running = False + continue + + # optional: propogate result to websocket peers + if self._websocket_cmd and result: + # but only when there is a change + normalize = lambda k: json.dumps(k, sort_keys=True, indent=4) + propagate = True + + cached = await self.cache_get(self._cache_key) + if cached: + if normalize(cached) == normalize(result): + propagate = False + + if propagate: + for queue in connected_websockets: + await queue.put({ + "cmd": self._websocket_cmd, + "data": { + self._websocket_cmd: result + } + }) + + # optional: cache the result + if self._cache_key and result: + await self.cache_set(self._cache_key, result, self._cache_expiry) + + # optional: call completion function + if 'done' in self.__class__.__dict__: + await self.done(result) + + await sleep() + self._running = False + + async def task(self, *args, **kwargs): + raise NotImplementedError() + + async def done(self, *args, **kwargs): + """overload this method to execute this function after + completion of `task`. Results from `task` are parameters + for `done`.""" + raise NotImplementedError() + + async def end(self, result: dict): + raise NotImplementedError() + + async def cache_get(self, key: str) -> dict: + from fapi.factory import app, cache + + try: + data = await cache.get(key) + if not data: + return {} + return json.loads(data) + except Exception as ex: + app.logger.error(f"Redis GET error with key '{key}': {ex}") + + async def cache_set(self, key, val: Union[dict, int], expiry: int = 0) -> bool: + from fapi.factory import app, cache + try: + data = json.dumps(val) + if isinstance(expiry, int) and expiry > 0: + await cache.setex(key, expiry, data) + else: + await cache.set(key, data) + return True + except Exception as ex: + app.logger.error(f"Redis SET error with key '{key}': {ex}") + + +from fapi.tasks.proposals import FundingProposalsTask +from fapi.tasks.historical_prices import HistoricalPriceTask +from fapi.tasks.blockheight import BlockheightTask +from fapi.tasks.rates_fiat import FiatRatesTask +from fapi.tasks.rates_crypto import CryptoRatesTask +from fapi.tasks.reddit import RedditTask +from fapi.tasks.rpc_nodes import RPCNodeCheckTask +from fapi.tasks.xmrig import XmrigTask +from fapi.tasks.xmrto import XmrToTask diff --git a/fapi/tasks/blockheight.py b/fapi/tasks/blockheight.py new file mode 100644 index 0000000..4e8c669 --- /dev/null +++ b/fapi/tasks/blockheight.py @@ -0,0 +1,161 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2020, The Monero Project. +# Copyright (c) 2020, dsc@xmr.pm + +import re +from typing import Union +from collections import Counter +from functools import partial + +import settings +from fapi.utils import httpget, popularity_contest +from fapi.tasks import FeatherTask + + +class BlockheightTask(FeatherTask): + """ + Fetch latest blockheight using webcrawling. We pick the most popular + height from a list of websites. Arguably this approach has benefits + over querying a (local) Monero RPC instance, as that requires + maintenance, while this solution assumes that (at least) 2 websites + reports the correct height. + """ + def __init__(self, interval: int = 60): + super(BlockheightTask, self).__init__(interval) + + self._cache_key = "blockheights" + self._cache_expiry = 90 + + self._websocket_cmd = "blockheights" + + self._fns = { + "xmr": { + "mainnet": [ + self._blockchair, + partial(self._onion_explorer, url="https://xmrchain.net/"), + partial(self._onion_explorer, url="https://community.xmr.to/explorer/mainnet/"), + partial(self._onion_explorer, url="https://monero.exan.tech/") + ], + "stagenet": [ + partial(self._onion_explorer, url="https://stagenet.xmrchain.net/"), + partial(self._onion_explorer, url="https://community.xmr.to/explorer/stagenet/"), + partial(self._onion_explorer, url="https://monero-stagenet.exan.tech/") + ] + }, + "wow": { + "mainnet": [ + partial(self._onion_explorer, url="https://explore.wownero.com/"), + ] + }, + "aeon": { + "mainnet": [ + partial(self._onion_explorer, url="https://aeonblockexplorer.com/"), + ], + "stagenet": [ + partial(self._onion_explorer, url="http://162.210.173.151:8083/"), + ] + }, + "trtl": { + "mainnet": [ + self._turtlenode, + self._turtlenetwork, + self._l33d4n + ] + }, + "xhv": { + "mainnet": [ + partial(self._onion_explorer, url="https://explorer.havenprotocol.org/") + ], + "stagenet": [ + partial(self._onion_explorer, url="https://explorer.stagenet.havenprotocol.org/page/1") + ] + }, + "loki": { + "mainnet": [ + partial(self._onion_explorer, url="https://lokiblocks.com/") + ], + "testnet": [ + partial(self._onion_explorer, url="https://lokitestnet.com/") + ] + } + } + + async def task(self) -> Union[dict, None]: + from fapi.factory import app + coin_network_types = ["mainnet", "stagenet", "testnet"] + data = {t: 0 for t in coin_network_types} + + for coin_network_type in coin_network_types: + if coin_network_type not in self._fns[settings.COIN_SYMBOL]: + continue + + heights = [] + for fn in self._fns[settings.COIN_SYMBOL][coin_network_type]: + fn_name = fn.func.__name__ if isinstance(fn, partial) else fn.__name__ + + try: + result = await fn() + heights.append(result) + except Exception as ex: + app.logger.error(f"blockheight fetch failed from {fn_name}(): {ex}") + continue + + if heights: + data[coin_network_type] = popularity_contest(heights) + + if data["mainnet"] == 0: # only care about mainnet + app.logger.error(f"Failed to parse latest blockheight!") + return + + return data + + async def _blockchair(self) -> int: + re_blockheight = r"(\d+)" + + url = "https://blockchair.com/monero" + content = await httpget(url, json=False, raise_for_status=True) + + height = re.findall(re_blockheight, content) + height = max(map(int, height)) + return height + + async def _wownero(self) -> int: + url = "https://explore.wownero.com/" + return await BlockheightTask._onion_explorer(url) + + async def _turtlenode(self) -> int: + url = "https://public.turtlenode.net/info" + blob = await httpget(url, json=True, raise_for_status=True) + height = int(blob.get("height", 0)) + if height <= 0: + raise Exception("bad height") + return height + + async def _turtlenetwork(self) -> int: + url = "https://tnnode2.turtlenetwork.eu/blocks/height" + blob = await httpget(url, json=True, raise_for_status=True) + height = int(blob.get("height", 0)) + if height <= 0: + raise Exception("bad height") + return height + + async def _l33d4n(self): + url = "https://blockapi.turtlepay.io/block/header/top" + blob = await httpget(url, json=True, raise_for_status=True) + height = int(blob.get("height", 0)) + if height <= 0: + raise Exception("bad height") + return height + + @staticmethod + async def _onion_explorer(url): + """ + Pages that are based on: + https://github.com/moneroexamples/onion-monero-blockchain-explorer + """ + re_blockheight = r"block\/(\d+)\"\>" + content = await httpget(url, json=False) + + height = re.findall(re_blockheight, content) + height = max(map(int, height)) + return height diff --git a/fapi/tasks/historical_prices.py b/fapi/tasks/historical_prices.py new file mode 100644 index 0000000..1ffd724 --- /dev/null +++ b/fapi/tasks/historical_prices.py @@ -0,0 +1,113 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2020, The Monero Project. +# Copyright (c) 2020, dsc@xmr.pm + +import os +import json +from typing import List, Union +from datetime import datetime + +import aiofiles + +import settings +from fapi.utils import httpget +from fapi.tasks import FeatherTask + + +class HistoricalPriceTask(FeatherTask): + """ + This class manages a historical price (USD) database, saved in a + textfile at `self._path`. A Feather wallet instance will ask + for the historical fiat price database on startup (but only + in chunks of a month for anti-fingerprinting reasons). + + The task in this class simply keeps the fiat database + up-to-date locally. + """ + def __init__(self, interval: int = 43200): + super(HistoricalPriceTask, self).__init__(interval) + + self._cache_key = f"historical_fiat" + self._path = f"data/historical_prices_{settings.COIN_SYMBOL}.json" + self._http_endpoint = f"https://www.coingecko.com/price_charts/{settings.COIN_NAME}/usd/max.json" + + self._year_genesis = int(settings.COIN_GENESIS_DATE[:4]) + + self._load() + + async def task(self) -> Union[dict, None]: + content = await httpget(self._http_endpoint, json=True, raise_for_status=False) + if "stats" not in content: + raise Exception() + + stats: List[List] = content.get('stats', []) # [[timestamp,USD],] + if not stats: + return + + data = { + year: { + month: {} for month in range(1, 13) + } for year in range(self._year_genesis, datetime.now().year + 1) + } + + # timestamp:USD + daily_price_blob = {day[0]: day[1] for day in stats} + + # normalize + for timestamp, usd in daily_price_blob.items(): + _date = datetime.fromtimestamp(timestamp / 1000) + data[_date.year].setdefault(_date.month, {}) + data[_date.year][_date.month][_date.day] = usd + + # update local database + await self._write(data) + return data + + async def _load(self) -> None: + if not os.path.exists(self._path): + return + + async with aiofiles.open(self._path, mode="r") as f: + content = await f.read() + blob = json.loads(content) + + # ¯\_(ツ)_/¯ + blob = {int(k): { + int(_k): { + int(__k): __v for __k, __v in _v.items() + } for _k, _v in v.items() + } for k, v in blob.items()} + + await self.cache_set(self._cache_key, blob) + + async def _write(self, blob: dict) -> None: + data = json.dumps(blob, sort_keys=True, indent=4) + async with aiofiles.open(self._path, mode="w") as f: + await f.write(data) + + @staticmethod + async def get(year: int, month: int = None) -> Union[dict, None]: + """This function is called when a Feather wallet client asks + for (a range of) historical fiat information. It returns the + data filtered by the parameters.""" + from fapi.factory import cache + + blob = await cache.get("historical_fiat") + blob = json.loads(blob) + if year not in blob: + return + + rtn = {} + if not month: + for _m, days in blob[year].items(): + for day, price in days.items(): + rtn[datetime(year, _m, day).strftime('%Y%m%d')] = price + return rtn + + if month not in blob[year]: + return + + for day, price in blob[year][month].items(): + rtn[datetime(year, month, day).strftime('%Y%m%d')] = price + + return rtn diff --git a/fapi/tasks/proposals.py b/fapi/tasks/proposals.py new file mode 100644 index 0000000..bdffbfe --- /dev/null +++ b/fapi/tasks/proposals.py @@ -0,0 +1,138 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2020, The Monero Project. +# Copyright (c) 2020, dsc@xmr.pm + +from bs4 import BeautifulSoup +from typing import List + +import settings +from fapi.utils import httpget +from fapi.tasks import FeatherTask + + +class FundingProposalsTask(FeatherTask): + """Fetch funding proposals made by the community.""" + def __init__(self, interval: int = 600): + from fapi.factory import app + super(FundingProposalsTask, self).__init__(interval) + + self._cache_key = "funding_proposals" + self._cache_expiry = self.interval * 1000 + + # url + self._http_endpoints = { + "xmr": "https://ccs.getmonero.org", + "wow": "https://funding.wownero.com" + } + + if settings.COIN_SYMBOL not in self._http_endpoints: + app.logger.warning(f"Missing proposal URL for {settings.COIN_SYMBOL.upper()}, ignoring update task") + self._active = False + + self._http_endpoint = self._http_endpoints[settings.COIN_SYMBOL] + if self._http_endpoint.endswith("/"): + self._http_endpoint = self._http_endpoint[:-1] + + # websocket + self._websocket_cmd = "funding_proposals" + self._websocket_cmds = { + "xmr": "ccs", + "wow": "wfs" + } + + if settings.COIN_SYMBOL not in self._websocket_cmds: + app.logger.warning(f"Missing websocket cmd for {settings.COIN_SYMBOL.upper()}, ignoring update task") + self._active = False + + self._websocket_cmd = self._websocket_cmds[settings.COIN_SYMBOL] + + async def task(self): + if settings.COIN_SYMBOL == "xmr": + return await self._xmr() + elif settings.COIN_SYMBOL == "wow": + return await self._wfs() + + async def _xmr(self) -> List[dict]: + # CCS API is lacking; + # - API returns more `FUNDING-REQUIRED` proposals than there are on the website + # - API does not allow filtering + # - API sometimes breaks; https://hackerone.com/reports/934231 + # we'll web scrape instead + from fapi.factory import app + + content = await httpget(f"{self._http_endpoint}/funding-required/", json=False) + soup = BeautifulSoup(content, "html.parser") + + listings = [] + for listing in soup.findAll("a", {"class": "ffs-idea"}): + try: + item = { + "state": "FUNDING-REQUIRED", + "author": listing.find("p", {"class": "author-list"}).text, + "date": listing.find("p", {"class": "date-list"}).text, + "title": listing.find("h3").text, + "raised_amount": float(listing.find("span", {"class": "progress-number-funded"}).text), + "target_amount": float(listing.find("span", {"class": "progress-number-goal"}).text), + "contributors": 0, + "url": f"{self._http_endpoint}{listing.attrs['href']}" + } + item["percentage_funded"] = item["raised_amount"] * (100 / item["target_amount"]) + if item["percentage_funded"] >= 100: + item["percentage_funded"] = 100.0 + try: + item["contributors"] = int(listing.find("p", {"class": "contributor"}).text.split(" ")[0]) + except: + pass + + href = listing.attrs['href'] + + try: + content = await httpget(f"{self._http_endpoint}{href}", json=False) + try: + soup2 = BeautifulSoup(content, "html.parser") + except Exception as ex: + app.logger.error(f"error parsing ccs HTML page: {ex}") + continue + + try: + instructions = soup2.find("div", {"class": "instructions"}) + if not instructions: + raise Exception("could not parse div.instructions, page probably broken") + address = instructions.find("p", {"class": "string"}).text + if not address.strip(): + raise Exception(f"error fetching ccs HTML: could not parse address") + item["address"] = address.strip() + except Exception as ex: + app.logger.error(f"error parsing ccs address from HTML: {ex}") + continue + except Exception as ex: + app.logger.error(f"error fetching ccs HTML: {ex}") + continue + listings.append(item) + except Exception as ex: + app.logger.error(f"error parsing a ccs item: {ex}") + + return listings + + async def _wfs(self) -> List[dict]: + """https://git.wownero.com/wownero/wownero-funding-system""" + blob = await httpget(f"{self._http_endpoint}/api/1/proposals?offset=0&limit=10&status=2", json=True) + if "data" not in blob: + raise Exception("invalid json response") + + listings = [] + for p in blob['data']: + item = { + "address": p["addr_donation"], + "url": f"{self._http_endpoint}/proposal/{p['id']}", + "state": "FUNDING-REQUIRED", + "date": p['date_posted'], + "title": p['headline'], + 'target_amount': p['funds_target'], + 'raised_amount': round(p['funds_target'] / 100 * p['funded_pct'], 2), + 'contributors': 0, + 'percentage_funded': round(p['funded_pct'], 2), + 'author': p['user'] + } + listings.append(item) + return listings diff --git a/fapi/tasks/rates_crypto.py b/fapi/tasks/rates_crypto.py new file mode 100644 index 0000000..047083d --- /dev/null +++ b/fapi/tasks/rates_crypto.py @@ -0,0 +1,59 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2020, The Monero Project. +# Copyright (c) 2020, dsc@xmr.pm + +from typing import List, Union + +import settings +from fapi.utils import httpget +from fapi.tasks import FeatherTask + + +class CryptoRatesTask(FeatherTask): + def __init__(self, interval: int = 180): + super(CryptoRatesTask, self).__init__(interval) + + self._cache_key = "crypto_rates" + self._cache_expiry = self.interval * 10 + + self._websocket_cmd = "crypto_rates" + + self._http_api_gecko = "https://api.coingecko.com/api/v3" + + async def task(self) -> Union[List[dict], None]: + """Fetch USD prices for various coins""" + from fapi.factory import app + + url = f"{self._http_api_gecko}/coins/markets?vs_currency=usd" + rates = await httpget(url, json=True) + + # normalize object, too many useless keys + rates = [{ + "id": r["id"], + "symbol": r["symbol"], + "image": r["image"], + "name": r["name"], + "current_price": r["current_price"], + "price_change_percentage_24h": r["price_change_percentage_24h"] + } for r in rates] + + # additional coins as defined by `settings.CRYPTO_RATES_COINS_EXTRA` + for coin, symbol in settings.CRYPTO_RATES_COINS_EXTRA.items(): + url = f"{self._http_api_gecko}/simple/price?ids={coin}&vs_currencies=usd" + try: + data = await httpget(url, json=True) + if coin not in data or "usd" not in data[coin]: + continue + + rates.append({ + "id": coin, + "symbol": symbol, + "image": "", + "name": coin.capitalize(), + "current_price": data[coin]["usd"], + "price_change_percentage_24h": 0.0 + }) + except Exception as ex: + app.logger.error(f"extra coin: {coin}; {ex}") + + return rates diff --git a/fapi/tasks/rates_fiat.py b/fapi/tasks/rates_fiat.py new file mode 100644 index 0000000..aad593f --- /dev/null +++ b/fapi/tasks/rates_fiat.py @@ -0,0 +1,23 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2020, The Monero Project. +# Copyright (c) 2020, dsc@xmr.pm + +from fapi.utils import httpget +from fapi.tasks import FeatherTask + + +class FiatRatesTask(FeatherTask): + def __init__(self, interval: int = 600): + super(FiatRatesTask, self).__init__(interval) + + self._cache_key = "fiat_rates" + self._cache_expiry = self.interval * 10 + + self._websocket_cmd = "fiat_rates" + + self._http_endpoint = "https://api.exchangeratesapi.io/latest?base=USD" + + async def task(self): + """Fetch fiat rates""" + result = await httpget(self._http_endpoint, json=True) + return result diff --git a/fapi/tasks/reddit.py b/fapi/tasks/reddit.py new file mode 100644 index 0000000..98311fc --- /dev/null +++ b/fapi/tasks/reddit.py @@ -0,0 +1,56 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2020, The Monero Project. +# Copyright (c) 2020, dsc@xmr.pm + +import settings +from fapi.utils import httpget +from fapi.tasks import FeatherTask + + +class RedditTask(FeatherTask): + def __init__(self, interval: int = 900): + from fapi.factory import app + super(RedditTask, self).__init__(interval) + + self._cache_key = "reddit" + self._cache_expiry = self.interval * 10 + + self._websocket_cmd = "reddit" + + self._http_endpoints = { + "xmr": "https://www.reddit.com/r/monero", + "wow": "https://www.reddit.com/r/wownero", + "aeon": "https://www.reddit.com/r/aeon", + "trtl": "https://www.reddit.com/r/TRTL", + "xhv": "https://www.reddit.com/r/havenprotocol", + "loki": "https://www.reddit.com/r/LokiProject" + } + + if settings.COIN_SYMBOL not in self._http_endpoints: + app.logger.warning(f"Missing Reddit URL for {settings.COIN_SYMBOL.upper()}, ignoring update task") + self._active = False + + self._http_endpoint = self._http_endpoints[settings.COIN_SYMBOL] + if self._http_endpoint.endswith("/"): + self._http_endpoint = self._http_endpoint[:-1] + + async def task(self): + from fapi.factory import app + + url = f"{self._http_endpoint}/new.json?limit=15" + try: + blob = await httpget(url, json=True, raise_for_status=True) + except Exception as ex: + app.logger.error(f"failed fetching '{url}' {ex}") + raise + + blob = [{ + 'title': z['data']['title'], + 'author': z['data']['author'], + 'url': "https://old.reddit.com" + z['data']['permalink'], + 'comments': z['data']['num_comments'] + } for z in blob['data']['children']] + if not blob: + raise Exception("no content") + + return blob diff --git a/fapi/tasks/rpc_nodes.py b/fapi/tasks/rpc_nodes.py new file mode 100644 index 0000000..e6453ed --- /dev/null +++ b/fapi/tasks/rpc_nodes.py @@ -0,0 +1,117 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2020, The Monero Project. +# Copyright (c) 2020, dsc@xmr.pm + +import json +from typing import List + +import settings +from fapi.utils import httpget, popularity_contest +from fapi.tasks import FeatherTask + + +class RPCNodeCheckTask(FeatherTask): + def __init__(self, interval: int = 60): + super(RPCNodeCheckTask, self).__init__(interval) + + self._cache_key = "rpc_nodes" + self._cache_expiry = None + + self._websocket_cmd = "nodes" + + self._http_timeout = 5 + self._http_timeout_onion = 10 + + async def task(self) -> List[dict]: + """Check RPC nodes status""" + from fapi.factory import app, rpc_nodes, cache + + try: + heights = json.loads(await cache.get("blockheights")) + except: + heights = {} + + nodes = [] + for network_type_coin, _ in rpc_nodes.items(): + data = [] + + for network_type, _nodes in _.items(): + for node in _nodes: + try: + blob = await self.node_check(node, network_type=network_type) + data.append(blob) + except Exception as ex: + app.logger.warning(f"node {node} not reachable; {ex}") + data.append(self._bad_node(**{ + "address": node, + "nettype": network_type_coin, + "type": network_type, + "height": 0 + })) + + # not neccesary for stagenet/testnet nodes to be validated + if network_type_coin != "mainnet": + nodes += data + continue + + if not data: + continue + + # Filter out nodes affected by < v0.17.1.3 sybil attack + data = list(map(lambda node: node if node['target_height'] <= node['height'] + else self._bad_node(**node), data)) + + allowed_offset = 3 + valid_heights = [] + current_blockheight = heights.get(network_type_coin, 0) + + if isinstance(current_blockheight, int) and current_blockheight > 0: + # blockheight from cache has precedence + valid_heights = range(current_blockheight, current_blockheight - allowed_offset, -1) + else: + # popularity contest + common_height = popularity_contest([z['height'] for z in data]) + valid_heights = range(common_height, common_height - allowed_offset, -1) + + data = list(map(lambda node: node if node['height'] in valid_heights + else self._bad_node(**node), data)) + nodes += data + return nodes + + async def node_check(self, node, network_type: str) -> dict: + """Call /get_info on the RPC, return JSON""" + opts = { + "timeout": self._http_timeout, + "json": True + } + + if network_type == "tor": + opts["socks5"] = settings.TOR_SOCKS_PROXY + opts["timeout"] = self._http_timeout_onion + + blob = await httpget(f"http://{node}/get_info", **opts) + for expect in ["nettype", "height", "target_height"]: + if expect not in blob: + raise Exception(f"Invalid JSON response from RPC; expected key '{expect}'") + + height = int(blob.get("height", 0)) + target_height = int(blob.get("target_height", 0)) + + return { + "address": node, + "height": height, + "target_height": target_height, + "online": True, + "nettype": blob["nettype"], + "type": network_type + } + + def _bad_node(self, **kwargs): + return { + "address": kwargs['address'], + "height": kwargs['height'], + "target_height": 0, + "online": False, + "nettype": kwargs['nettype'], + "type": kwargs['type'] + } diff --git a/fapi/tasks/xmrig.py b/fapi/tasks/xmrig.py new file mode 100644 index 0000000..d79cf7c --- /dev/null +++ b/fapi/tasks/xmrig.py @@ -0,0 +1,58 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2020, The Monero Project. +# Copyright (c) 2020, dsc@xmr.pm + +from dateutil.parser import parse + +import settings +from fapi.utils import httpget +from fapi.tasks import FeatherTask + + +class XmrigTask(FeatherTask): + """Fetches the latest XMRig releases using Github's API""" + def __init__(self, interval: int = 43200): + super(XmrigTask, self).__init__(interval) + + self._cache_key = "xmrig" + self._cache_expiry = self.interval * 10 + + self._websocket_cmd = "xmrig" + + self._http_endpoint = "https://api.github.com/repos/xmrig/xmrig/releases" + + async def task(self) -> dict: + blob = await httpget(self._http_endpoint) + if not isinstance(blob, list) or not blob: + raise Exception(f"Invalid JSON response for {self._http_endpoint}") + blob = blob[0] + + # only uploaded assets + assets = list(filter(lambda k: k['state'] == 'uploaded', blob['assets'])) + + # only archives + assets = list(filter(lambda k: k['name'].endswith(('.tar.gz', '.zip')), assets)) + + version = blob['tag_name'] + data = {} + + # sort by OS + for asset in assets: + operating_system = "linux" + if "msvc" in asset['name'] or "win64" in asset['name']: + operating_system = "windows" + elif "macos" in asset["name"]: + operating_system = "macos" + + data.setdefault(operating_system, []) + data[operating_system].append({ + "name": asset["name"], + "created_at": parse(asset["created_at"]).strftime("%Y-%m-%d"), + "url": f"https://github.com/xmrig/xmrig/releases/download/{version}/{asset['name']}", + "download_count": int(asset["download_count"]) + }) + + return { + "version": version, + "assets": data + } diff --git a/fapi/tasks/xmrto.py b/fapi/tasks/xmrto.py new file mode 100644 index 0000000..7463b9d --- /dev/null +++ b/fapi/tasks/xmrto.py @@ -0,0 +1,26 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2020, The Monero Project. +# Copyright (c) 2020, dsc@xmr.pm + +import settings +from fapi.utils import httpget +from fapi.tasks import FeatherTask + + +class XmrToTask(FeatherTask): + def __init__(self, interval: int = 30): + super(XmrToTask, self).__init__(interval) + + self._cache_key = "xmrto_rates" + self._cache_expiry = self.interval * 10 + + if settings.COIN_MODE == 'stagenet': + self._http_endpoint = "https://test.xmr.to/api/v3/xmr2btc/order_parameter_query/" + else: + self._http_endpoint = "https://xmr.to/api/v3/xmr2btc/order_parameter_query/" + + async def task(self): + result = await httpget(self._http_endpoint) + if "error" in result: + raise Exception(f"${result['error']} ${result['error_msg']}") + return result diff --git a/fapi/utils.py b/fapi/utils.py index 68406dd..73eac73 100644 --- a/fapi/utils.py +++ b/fapi/utils.py @@ -2,55 +2,34 @@ # Copyright (c) 2020, The Monero Project. # Copyright (c) 2020, dsc@xmr.pm -import asyncio import json +import asyncio import os -import re import random -from functools import wraps from datetime import datetime +from collections import Counter +from functools import wraps +from typing import List, Union +import psutil import aiohttp +from aiohttp_socks import ProxyConnector import settings -class BlockHeight: - @staticmethod - async def xmrchain(stagenet: bool = False): - re_blockheight = r"block\/(\d+)\"\>" - url = "https://stagenet.xmrchain.net/" if stagenet else "https://xmrchain.net/" - content = await httpget(url, json=False) - xmrchain = re.findall(re_blockheight, content) - current = max(map(int, xmrchain)) - return current - - @staticmethod - async def xmrto(stagenet: bool = False): - re_blockheight = r"block\/(\d+)\"\>" - url = "https://community.xmr.to/explorer/stagenet/" if stagenet else "https://community.xmr.to/explorer/mainnet/" - content = await httpget(url, json=False) - xmrchain = re.findall(re_blockheight, content) - current = max(map(int, xmrchain)) - return current - - -async def loopyloop(secs: int, func, after_func=None): - """ - asyncio loop - :param secs: interval - :param func: function to execute - :param after_func: function to execute after completion - :return: - """ - while True: - result = await func() - if after_func: - await after_func(result) - - # randomize a bit for Tor anti fingerprint reasons - _secs = random.randrange(secs - 5, secs +5) - await asyncio.sleep(_secs) +def print_banner(): + print(f"""\033[91m + █████▒▓█████ ▄▄▄ ▄▄▄█████▓ ██░ ██ ▓█████ ██▀███ + ▓██ ▒ ▓█ ▀▒████▄ ▓ ██▒ ▓▒▓██░ ██▒▓█ ▀ ▓██ ▒ ██▒ + ▒████ ░ ▒███ ▒██ ▀█▄ ▒ ▓██░ ▒░▒██▀▀██░▒███ ▓██ ░▄█ ▒ + ░▓█▒ ░ ▒▓█ ▄░██▄▄▄▄██░ ▓██▓ ░ ░▓█ ░██ ▒▓█ ▄ ▒██▀▀█▄ + ░▒█░ ░▒████▒▓█ ▓██▒ ▒██▒ ░ ░▓█▒░██▓░▒████▒░██▓ ▒██▒ + ▒ ░ ░░ ▒░ ░▒▒ ▓▒█░ ▒ ░░ ▒ ░░▒░▒░░ ▒░ ░░ ▒▓ ░▒▓░ + ░ ░ ░ ░ ▒ ▒▒ ░ ░ ▒ ░▒░ ░ ░ ░ ░ ░▒ ░ ▒░ + ░ ░ ░ ░ ▒ ░ ░ ░░ ░ ░ ░░ ░ + ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ {settings.COIN_SYMBOL}\033[0m + """.strip()) def collect_websocket(func): @@ -66,32 +45,21 @@ def collect_websocket(func): return wrapper -async def broadcast_blockheight(): - from fapi.factory import connected_websockets, api_data - for queue in connected_websockets: - await queue.put({ - "cmd": "blockheights", - "data": { - "height": api_data.get("blockheights", {}) - } - }) - - -async def broadcast_nodes(): - from fapi.factory import connected_websockets, api_data - for queue in connected_websockets: - await queue.put({ - "cmd": "nodes", - "data": api_data['nodes'] - }) - - -async def httpget(url: str, json=True): - timeout = aiohttp.ClientTimeout(total=30) +async def httpget(url: str, json=True, timeout: int = 5, socks5: str = None, raise_for_status=True): headers = {"User-Agent": random_agent()} - async with aiohttp.ClientSession(timeout=timeout) as session: + opts = {"timeout": aiohttp.ClientTimeout(total=timeout)} + if socks5: + opts['connector'] = ProxyConnector.from_url(socks5) + + async with aiohttp.ClientSession(**opts) as session: async with session.get(url, headers=headers) as response: - return await response.json() if json else await response.text() + if raise_for_status: + response.raise_for_status() + + result = await response.json() if json else await response.text() + if result is None or (isinstance(result, str) and result == ''): + raise Exception("empty response from request") + return result def random_agent(): @@ -99,129 +67,64 @@ def random_agent(): return random.choice(user_agents) -class TxFiatDb: - # historical fiat price db for given symbol - def __init__(self, symbol, block_date_start): - self.fn = "data/fiatdb" - self.symbol = symbol - self.block_start = block_date_start - self._url = "https://www.coingecko.com/price_charts/69/usd/max.json" - self.data = {} - self.load() - - def get(self, year: int, month: int = None): - rtn = {} - if year not in self.data: - return - if not month: - for _m, days in self.data[year].items(): - for day, price in days.items(): - rtn[datetime(year, _m, day).strftime('%Y%m%d')] = price - return rtn - if month not in self.data[year]: - return - for day, price in self.data[year][month].items(): - rtn[datetime(year, month, day).strftime('%Y%m%d')] = price - return rtn - - def load(self): - if not os.path.exists("fiatdb"): - return {} - f = open("fiatdb", "r") - data = f.read() - f.close() +async def feather_data(): + """A collection of data collected by + `FeatherTask`, for Feather wallet clients.""" + from fapi.factory import cache, now + data = await cache.get("data") + if data: data = json.loads(data) + return data - # whatever - self.data = {int(k): {int(_k): {int(__k): __v for __k, __v in _v.items()} for _k, _v in v.items()} for k, v in data.items()} + keys = ["blockheights", "funding_proposals", "crypto_rates", "fiat_rates", "reddit", "rpc_nodes", "xmrig", "xmrto_rates"] + data = {keys[i]: json.loads(val) if val else None for i, val in enumerate(await cache.mget(*keys))} - def write(self): - f = open("fiatdb", "w") - f.write(json.dumps(self.data)) - f.close() + # @TODO: for backward-compat reasons we're including some legacy keys which can be removed after 1.0 release + data['nodes'] = data['rpc_nodes'] + data['ccs'] = data['funding_proposals'] + data['wfs'] = data['funding_proposals'] - async def update(self): - try: - content = await httpget(self._url, json=True) - if not "stats" in content: - raise Exception() - except Exception as ex: - return - - stats = content.get('stats') - if not stats: - return - - year_start = int(self.block_start[:4]) - self.data = {z: {k: {} for k in range(1, 13)} - for z in range(year_start, datetime.now().year + 1)} - content = {z[0]: z[1] for z in stats} - - for k, v in content.items(): - _date = datetime.fromtimestamp(k / 1000) - self.data[_date.year].setdefault(_date.month, {}) - self.data[_date.year][_date.month][_date.day] = v - - self.write() + # start caching when application lifetime is more than 20 seconds + if (datetime.now() - now).total_seconds() > 20: + await cache.setex("data", 30, json.dumps(data)) + return data -class XmrRig: - @staticmethod - async def releases(): - from fapi.factory import app, cache - from fapi.fapi import FeatherApi +def popularity_contest(lst: List[int]) -> Union[int, None]: + """Return most common occurrences of List[int]. If + there are no duplicates, return max() instead. + """ + if not lst: + return + if len(set(lst)) == len(lst): + return max(lst) + return Counter(lst).most_common(1)[0][0] - blob = await FeatherApi.redis_get("xmrig_releases") - if blob and app.config["DEBUG"]: - return blob - try: - result = await httpget(settings.urls["xmrig"]) - if not isinstance(result, list): - raise Exception("JSON response was not a list") - if len(result) <= 1: - raise Exception("JSON response list was 1 or less") - result = result[0] - await cache.set("xmrig_releases", json.dumps(result)) - blob = result - except Exception as ex: - app.logger.error(f"error parsing xmrig blob: {ex}") - if blob: - app.logger.warning(f"passing xmrig output from cache") - return blob +def current_worker_thread_is_primary() -> bool: + """ + ASGI server (Hypercorn) may start multiple + worker threads, but we only want one feather-ws + instance to schedule `FeatherTask` tasks at an + interval. Therefor this function determines if the + current instance is responsible for the + recurring Feather tasks. + """ + from fapi.factory import app - return blob + current_pid = os.getpid() + parent_pid = os.getppid() + app.logger.debug(f"current_pid: {current_pid}, " + f"parent_pid: {parent_pid}") - @staticmethod - async def after_releases(data): - from fapi.factory import api_data - from dateutil.parser import parse - _data = [] - for asset in data['assets']: - for expected in ["tar.gz", ".zip"]: - if asset["state"] != "uploaded": - continue - if asset["name"].endswith(expected): - _data.append(asset) - version = data['tag_name'] - assets = {} + if parent_pid == 0: + return True - for asset in _data: - operating_system = "linux" - if "msvc" in asset['name'] or "win64" in asset['name']: - operating_system = "windows" - elif "macos" in asset["name"]: - operating_system = "macos" + parent = psutil.Process(parent_pid) + if parent.name() != "hypercorn": + return True - assets.setdefault(operating_system, []) - assets[operating_system].append({ - "name": asset["name"], - "created_at": parse(asset["created_at"]).strftime("%Y-%m-%d"), - "url": f"https://github.com/xmrig/xmrig/releases/download/{version}/{asset['name']}", - "download_count": int(asset["download_count"]) - }) + lowest_pid = min(c.pid for c in parent.children(recursive=True) if c.name() == "hypercorn") + if current_pid == lowest_pid: + return True - api_data["xmrig"] = { - "version": version, - "assets": assets - } diff --git a/fapi/wsparse.py b/fapi/wsparse.py index e5a07e8..3ee0335 100644 --- a/fapi/wsparse.py +++ b/fapi/wsparse.py @@ -21,5 +21,5 @@ class WebsocketParse: year = data.get('year') month = data.get('month') - from fapi.factory import txfiatdb - return txfiatdb.get(year, month) + from fapi.tasks.historical_prices import HistoricalPriceTask + return await HistoricalPriceTask.get(year, month) diff --git a/requirements.txt b/requirements.txt index 818d105..7038f33 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,9 @@ quart aioredis aiohttp +aiofiles quart_session beautifulsoup4 aiohttp_socks -python-dateutil \ No newline at end of file +python-dateutil +psutil \ No newline at end of file diff --git a/run.py b/run.py index e5e9f28..2c40153 100644 --- a/run.py +++ b/run.py @@ -6,4 +6,4 @@ from fapi.factory import create_app import settings app = create_app() -app.run(settings.host, port=settings.port, debug=settings.debug, use_reloader=False) +app.run(settings.HOST, port=settings.PORT, debug=settings.DEBUG, use_reloader=False) diff --git a/settings.py_example b/settings.py_example index 9ec119c..50765c1 100644 --- a/settings.py_example +++ b/settings.py_example @@ -2,29 +2,33 @@ # Copyright (c) 2020, The Monero Project. # Copyright (c) 2020, dsc@xmr.pm -debug = False -host = "127.0.0.1" -port = 1337 -redis_password = None -redis_address = "redis://localhost" -tor_socks = "socks5://127.0.0.1:9050" -rpc_url = "http://127.0.0.1:18089" -xmrchain = "https://stagenet.xmrchain.net" +import os +cwd = os.path.dirname(os.path.realpath(__file__)) -crypto_name = "monero" -crypto_symbol = "xmr" -crypto_block_date_start = "20140418" -urls = { - "xmrig": "https://api.github.com/repos/xmrig/xmrig/releases", - "reddit": "https://www.reddit.com/r/monero/top.json?limit=100", - "ccs": "https://ccs.getmonero.org", - "fiat_rates": "https://api.exchangeratesapi.io/latest?base=USD", - "crypto_rates": "https://api.coingecko.com/api/v3/coins/markets?vs_currency=usd", - "crypto_wow_rates": "https://api.coingecko.com/api/v3/simple/price?ids=wownero&vs_currencies=usd" +def bool_env(val): + return val is True or (isinstance(val, str) and (val.lower() == 'true' or val == '1')) + + +DEBUG = bool_env(os.environ.get("FEATHER_DEBUG", False)) +HOST = os.environ.get("FEATHER_HOST", "127.0.0.1") +PORT = int(os.environ.get("FEATHER_PORT", 1337)) + +REDIS_ADDRESS = os.environ.get("FEATHER_REDIS_ADDRESS", "redis://localhost") +REDIS_PASSWORD = os.environ.get("FEATHER_REDIS_PASSWORD") + +COIN_NAME = os.environ.get("FEATHER_COIN_NAME", "monero").lower() # as per coingecko +COIN_SYMBOL = os.environ.get("FEATHER_COIN_SYMBOL", "xmr").lower() # as per coingecko +COIN_GENESIS_DATE = os.environ.get("FEATHER_COIN_GENESIS_DATE", "20140418") +COIN_MODE = os.environ.get("FEATHER_COIN_MODE", "mainnet").lower() + +TOR_SOCKS_PROXY = os.environ.get("FEATHER_TOR_SOCKS_PROXY", "socks5://127.0.0.1:9050") + +# while fetching USD price from coingecko, also include these extra coins: +CRYPTO_RATES_COINS_EXTRA = { + "wownero": "wow", + "aeon": "aeon", + "turtlecoin": "trtl", + "haven": "xhv", + "loki": "loki" } - -if debug: - urls["xmrto_rates"] = "https://test.xmr.to/api/v3/xmr2btc/order_parameter_query/" -else: - urls["xmrto_rates"] = "https://xmr.to/api/v3/xmr2btc/order_parameter_query/"