Feather-ws rewrite;

- Move recurring tasks into their own class; inherits from `FeatherTask`
- CCS proposals: Don't use API, it's broken - webcrawl instead until it is fixed.
- Switch to hypercorn as the ASGI server, *with* support for multiple workers. You can now run feather-ws with, for example, `--workers 6`. See `Dockerfile`.
- Introduce support for various coins under `BlockheightTask`
- Introduce support for various Reddit communities under `RedditTask`
- Introduced weightvoting whilst validating third-party RPC blockheights - where nodes are filtered based on what other nodes are commonly reporting.
- Current blockheights are fetched from various block explorers and weightvoting is done to eliminate outliers under `BlockheightTask`.
- Don't filter/remove bad nodes from the rpc_nodes list; correctly label them as disabled/bad nodes.
- Multiple Feather instances (each for it's own coin) can now run on one machine, using only one Redis instance, as each coins has it's own Redis database index.
- Configuration options inside `settings.py` can now be controlled via environment variables.
- Better logging through custom log formatting and correct usage of `app.logger.*`
- Fixed a bug where one task could overlap with itself if the previous one did not finish yet. This was particularly noticable inside the `RPCNodeCheckTask` where the high timeout (for Tor nodes) could cause the task to run *longer* than the recurring task interval.
- Introduced a `docker-compose.yml` to combine the Feather container with Redis and Tor containers.
- Blocking IO operations are now done via `aiofiles`
This commit is contained in:
dsc 2020-12-22 19:03:48 +01:00
parent cb4087dd25
commit 42bb0c832e
24 changed files with 1250 additions and 671 deletions

View file

@ -1,9 +1,10 @@
FROM python:3.7 FROM python:3.7
WORKDIR /app
COPY requirements.txt . COPY requirements.txt .
RUN pip install -r requirements.txt RUN pip install -r requirements.txt
COPY . . COPY . .
EXPOSE 1337 EXPOSE 1337
CMD ["python3", "-u", "run.py"] CMD ["hypercorn", "--access-logfile", "-", "--workers", "1", "--bind", "0.0.0.0:18200", "asgi:app"]

View file

@ -1,34 +1,55 @@
# feather-ws # feather-ws
This is the back-end websocket server for Feather wallet. Back-end websocket server for Feather wallet.
- Python 3 asyncio - Quart web framework, Py3 asyncio
- Quart web framework
- Redis - Redis
## Coins supported
### Supervisor - Monero
- Wownero
Example config. See also the environment variables `FEATHER_COIN_NAME`, `FEATHER_COIN_SYMBOL`, etc. in `settings.py`.
## Tasks
This websocket server has several scheduled recurring tasks:
- Fetch latest blockheight from various block explorers
- Fetch crypto/fiat exchange rates
- Fetch latest Reddit posts
- Fetch funding proposals
- Check status of RPC nodes (`data/nodes.json`)
When Feather wallet starts up, it will connect to
this websocket server and receive the information
listed above which is necessary for normal operation.
See `fapi.tasks.*` for the various tasks.
## Development
Requires Python 3.7 and higher.
```text
[program:ws]
directory=/home/feather/feather-ws
command=/home/feather/feather-ws/venv/bin/python run.py
autostart=true
autorestart=true
startsecs=6
stdout_logfile=/home/feather/feather-ws/stdout.log
stdout_logfile_maxbytes=1MB
stdout_logfile_backups=10
stdout_capture_maxbytes=1MB
stderr_logfile=/home/feather/feather-ws/stderr.log
stderr_logfile_maxbytes=1MB
stderr_logfile_backups=10
stderr_capture_maxbytes=1MB
user = feather
environment=
HOME="/home/feather",
USER="feather",
PATH="/home/feather/feather-ws/venv/bin"
``` ```
virtualenv -p /usr/bin/python3 venv
source venv/bin/activate
pip install -r requirements.txt
export FEATHER_DEBUG=true
python run.py
```
Note that `run.py` is meant as a development server. For production,
use `asgi.py` with something like hypercorn.
## Docker
In production you may run via docker;
```
docker-compose up
```
Will bind on `http://127.0.0.1:1337`. Modify `docker-compose.yml` if necessary.

6
asgi.py Normal file
View file

@ -0,0 +1,6 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2020, The Monero Project.
# Copyright (c) 2020, dsc@xmr.pm
from fapi.factory import create_app
app = create_app()

View file

View file

@ -1,43 +1,63 @@
{ {
"mainnet": { "xmr": {
"tor": [ "mainnet": {
"fdlnlt5mr5o7lmhg.onion:18081", "tor": [
"xmkwypann4ly64gh.onion:18081", "fdlnlt5mr5o7lmhg.onion:18081",
"xmrtolujkxnlinre.onion:18081", "xmkwypann4ly64gh.onion:18081",
"xmrag4hf5xlabmob.onion:18081", "xmrtolujkxnlinre.onion:18081",
"monero26mmldsallmxok2kwamne4ve3mybvvn2yijsvss7ey63hc4yyd.onion:18081", "xmrag4hf5xlabmob.onion:18081",
"nrw57zxw5zyevn3i.onion:18081", "monero26mmldsallmxok2kwamne4ve3mybvvn2yijsvss7ey63hc4yyd.onion:18081",
"monero5sjoz5xmjn.onion:18081", "nrw57zxw5zyevn3i.onion:18081",
"mxcd4577fldb3ppzy7obmmhnu3tf57gbcbd4qhwr2kxyjj2qi3dnbfqd.onion:18081", "monero5sjoz5xmjn.onion:18081",
"moneroxmrxw44lku6qniyarpwgznpcwml4drq7vb24ppatlcg4kmxpqd.onion:18089", "mxcd4577fldb3ppzy7obmmhnu3tf57gbcbd4qhwr2kxyjj2qi3dnbfqd.onion:18081",
"moneroptqodufzxj.onion:18081", "moneroxmrxw44lku6qniyarpwgznpcwml4drq7vb24ppatlcg4kmxpqd.onion:18089",
"3hvpnd4xejtzcuowvru2wfjum5wjf7synigm44rrizr3k4v5vzam2bad.onion:18081", "moneroptqodufzxj.onion:18081",
"6dsdenp6vjkvqzy4wzsnzn6wixkdzihx3khiumyzieauxuxslmcaeiad.onion:18081", "3hvpnd4xejtzcuowvru2wfjum5wjf7synigm44rrizr3k4v5vzam2bad.onion:18081",
"3t7v5zpcfxq2tocdofdcwxgrldco3elotz3iis4jtbbnscy5alezw7yd.onion:18081" "6dsdenp6vjkvqzy4wzsnzn6wixkdzihx3khiumyzieauxuxslmcaeiad.onion:18081",
], "3t7v5zpcfxq2tocdofdcwxgrldco3elotz3iis4jtbbnscy5alezw7yd.onion:18081"
"clearnet": [ ],
"eu-west.node.xmr.pm:18089", "clearnet": [
"eu-west-2.node.xmr.pm:18089", "eu-west.node.xmr.pm:18089",
"usa-east-va.node.xmr.pm:18089", "eu-west-2.node.xmr.pm:18089",
"canada.node.xmr.pm:18089", "usa-east-va.node.xmr.pm:18089",
"singapore.node.xmr.pm:18089", "canada.node.xmr.pm:18089",
"192.110.160.146:18089", "singapore.node.xmr.pm:18089",
"nodes.hashvault.pro:18081", "192.110.160.146:18089",
"node.supportxmr.com:18081", "nodes.hashvault.pro:18081",
"node.imonero.org:18081", "node.supportxmr.com:18081",
"xmr-node-eu.cakewallet.com:18081", "node.imonero.org:18081",
"xmr-node-usa-east.cakewallet.com:18081", "xmr-node-eu.cakewallet.com:18081",
"node.xmr.pt:18081", "xmr-node-usa-east.cakewallet.com:18081",
"node.xmr.ru:18081", "node.xmr.pt:18081",
"xmr-peer-070.cypherpunklabs.com:18081", "node.xmr.ru:18081",
"xmr.fail:18081" "xmr-peer-070.cypherpunklabs.com:18081",
] "xmr.fail:18081"
]
},
"stagenet": {
"tor": [],
"clearnet": [
"run.your.own.node.xmr.pm:38089",
"super.fast.node.xmr.pm:38089"
]
}
}, },
"stagenet": { "wow": {
"tor": [], "mainnet": {
"clearnet": [ "tor": [
"run.your.own.node.xmr.pm:38089", "wowbuxx535x4exuexja2xfezpwcyznxkofui4ndjiectj4yuh2xheiid.onion:34568"
"super.fast.node.xmr.pm:38089" ],
] "clearnet": [
"wow.pwned.systems:34568",
"global.wownodes.com:34568",
"node.suchwow.xyz:34568",
"super.fast.node.xmr.pm:34568",
"wowbux.org:34568"
]
},
"stagenet": {
"tor": [],
"clearnet": []
}
} }
} }

27
docker-compose.yml Normal file
View file

@ -0,0 +1,27 @@
version: "3"
services:
redis:
container_name: redis
image: "redis:alpine"
command: redis-server
environment:
- REDIS_REPLICATION_MODE=master
tor-node:
image: osminogin/tor-simple
restart: always
feather-ws:
container_name: feather-ws
build:
context: .
dockerfile: Dockerfile
environment:
- FEATHER_DEBUG=false
- FEATHER_PORT=1337
- FEATHER_REDIS_ADDRESS=redis://redis
- FEATHER_TOR_SOCKS_PROXY=socks5://tor-node:9050
- FEATHER_COIN_NAME=monero
- FEATHER_COIN_SYMBOL=xmr
- FEATHER_COIN_MODE=mainnet
ports:
- "1337:1337"

View file

@ -4,77 +4,113 @@
import json import json
import asyncio import asyncio
from typing import List, Set
from datetime import datetime
from quart import Quart from quart import Quart
from quart_session import Session from quart_session import Session
import aioredis import aioredis
from fapi.utils import current_worker_thread_is_primary, print_banner
import settings import settings
app = None now = datetime.now()
app: Quart = None
cache = None cache = None
connected_websockets = set() rpc_nodes: dict = None
api_data = {} user_agents: List[str] = None
user_agents = None connected_websockets: Set[asyncio.Queue] = set()
txfiatdb = None _is_primary_worker_thread = False
print("""\033[91m
async def _setup_nodes(app: Quart):
global rpc_nodes
with open("data/nodes.json", "r") as f:
rpc_nodes = json.loads(f.read()).get(settings.COIN_SYMBOL)
async def _setup_user_agents(app: Quart):
global user_agents
\033[0m with open('data/user_agents.txt', 'r') as f:
""".strip()) user_agents = [l.strip() for l in f.readlines() if l.strip()]
async def _setup_cache(app: Quart): async def _setup_cache(app: Quart):
global cache global cache
# Each coin has it's own Redis DB index; `redis-cli -n $INDEX`
db = {"xmr": 0, "wow": 1, "aeon": 2, "trtl": 3, "msr": 4, "xhv": 5, "loki": 6}[settings.COIN_SYMBOL]
data = { data = {
"address": settings.redis_address "address": settings.REDIS_ADDRESS,
"db": db,
"password": settings.REDIS_PASSWORD if settings.REDIS_PASSWORD else None
} }
if settings.redis_password:
data['password'] = settings.redis_password
cache = await aioredis.create_redis_pool(**data) cache = await aioredis.create_redis_pool(**data)
app.config['SESSION_TYPE'] = 'redis' app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_REDIS'] = cache app.config['SESSION_REDIS'] = cache
Session(app) Session(app)
async def _setup_tasks(app: Quart):
"""Schedules a series of tasks at an interval."""
if not _is_primary_worker_thread:
return
from fapi.tasks import (
BlockheightTask, HistoricalPriceTask, FundingProposalsTask,
CryptoRatesTask, FiatRatesTask, RedditTask, RPCNodeCheckTask,
XmrigTask, XmrToTask)
asyncio.create_task(BlockheightTask().start())
asyncio.create_task(HistoricalPriceTask().start())
asyncio.create_task(CryptoRatesTask().start())
asyncio.create_task(FiatRatesTask().start())
asyncio.create_task(RedditTask().start())
asyncio.create_task(RPCNodeCheckTask().start())
asyncio.create_task(XmrigTask().start())
if settings.COIN_SYMBOL in ["xmr", "wow"]:
asyncio.create_task(FundingProposalsTask().start())
if settings.COIN_SYMBOL == "xmr":
asyncio.create_task(XmrToTask().start())
def _setup_logging():
from logging import Formatter
from logging.config import dictConfig
from quart.logging import default_handler
default_handler.setFormatter(Formatter('[%(asctime)s] %(levelname)s in %(funcName)s(): %(message)s (%(pathname)s)'))
dictConfig({
'version': 1,
'loggers': {
'quart.app': {
'level': 'DEBUG' if settings.DEBUG else 'INFO',
},
},
})
def create_app(): def create_app():
global app global app
_setup_logging()
app = Quart(__name__) app = Quart(__name__)
@app.before_serving @app.before_serving
async def startup(): async def startup():
global txfiatdb, user_agents global _is_primary_worker_thread
_is_primary_worker_thread = current_worker_thread_is_primary()
if _is_primary_worker_thread:
print_banner()
await _setup_cache(app) await _setup_cache(app)
loop = asyncio.get_event_loop() await _setup_nodes(app)
await _setup_user_agents(app)
await _setup_tasks(app)
with open('data/nodes.json', 'r') as f:
nodes = json.loads(f.read())
cache.execute('JSON.SET', 'nodes', '.', json.dumps(nodes))
with open('data/user_agents.txt', 'r') as f:
user_agents = [l.strip() for l in f.readlines() if l.strip()]
from fapi.fapi import FeatherApi
from fapi.utils import loopyloop, TxFiatDb, XmrRig
txfiatdb = TxFiatDb(settings.crypto_name, settings.crypto_block_date_start)
loop.create_task(loopyloop(20, FeatherApi.xmrto_rates, FeatherApi.after_xmrto))
loop.create_task(loopyloop(120, FeatherApi.crypto_rates, FeatherApi.after_crypto))
loop.create_task(loopyloop(600, FeatherApi.fiat_rates, FeatherApi.after_fiat))
loop.create_task(loopyloop(300, FeatherApi.ccs, FeatherApi.after_ccs))
loop.create_task(loopyloop(900, FeatherApi.reddit, FeatherApi.after_reddit))
loop.create_task(loopyloop(60, FeatherApi.blockheight, FeatherApi.after_blockheight))
loop.create_task(loopyloop(60, FeatherApi.check_nodes, FeatherApi.after_check_nodes))
loop.create_task(loopyloop(43200, txfiatdb.update))
loop.create_task(loopyloop(43200, XmrRig.releases, XmrRig.after_releases))
import fapi.routes import fapi.routes
return app return app

View file

@ -1,349 +0,0 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2020, The Monero Project.
# Copyright (c) 2020, dsc@xmr.pm
import json
import aiohttp
from bs4 import BeautifulSoup
from aiohttp_socks import ProxyType, ProxyConnector, ChainProxyConnector
from fapi.utils import broadcast_blockheight, broadcast_nodes, httpget, BlockHeight
import settings
class FeatherApi:
@staticmethod
async def redis_get(key):
from fapi.factory import app, cache
try:
data = await cache.get(key)
if data:
return json.loads(data)
except Exception as ex:
app.logger.error(f"Redis error: {ex}")
@staticmethod
async def redis_json_get(key, path="."):
from fapi.factory import app, cache
try:
data = await cache.execute('JSON.GET', key, path)
if data:
return json.loads(data)
except Exception as ex:
app.logger.error(f"Redis error: {ex}")
@staticmethod
async def xmrto_rates():
from fapi.factory import app, cache
xmrto_rates = await FeatherApi.redis_get("xmrto_rates")
if xmrto_rates and app.config["DEBUG"]:
return xmrto_rates
try:
result = await httpget(settings.urls["xmrto_rates"])
if not result:
raise Exception("empty response")
if "error" in result:
raise Exception(f"${result['error']} ${result['error_msg']}")
return result
except Exception as ex:
app.logger.error(f"error parsing xmrto_rates blob: {ex}")
return xmrto_rates
@staticmethod
async def after_xmrto(data):
from fapi.factory import app, cache, api_data, connected_websockets
if not data:
return
_data = api_data.get("xmrto_rates", {})
_data = json.dumps(_data, sort_keys=True, indent=4)
if json.dumps(data, sort_keys=True, indent=4) == _data:
return
api_data["xmrto_rates"] = data
@staticmethod
async def crypto_rates():
from fapi.factory import app, cache
crypto_rates = await FeatherApi.redis_get("crypto_rates")
if crypto_rates and app.config["DEBUG"]:
return crypto_rates
result = None
try:
result = await httpget(settings.urls["crypto_rates"])
if not result:
raise Exception("empty response")
crypto_rates = result
except Exception as ex:
app.logger.error(f"error parsing crypto_rates blob: {ex}")
if not result and crypto_rates:
app.logger.warning("USING OLD CACHE FOR CRYPTO RATES")
return crypto_rates
# grab WOW price while we're at it...
try:
_result = await httpget(settings.urls["crypto_wow_rates"])
if not _result:
raise Exception("empty response")
except Exception as ex:
_result = {}
if "wownero" in _result and "usd" in _result["wownero"]:
crypto_rates.append({
"id": "wownero",
"symbol": "wow",
"image": "",
"name": "Wownero",
"current_price": _result["wownero"]["usd"],
"price_change_percentage_24h": 0.0
})
await cache.set("crypto_rates", json.dumps(crypto_rates))
return crypto_rates
@staticmethod
async def after_crypto(data):
from fapi.factory import app, cache, api_data, connected_websockets
if not data:
return
_data = api_data.get("crypto_rates", {})
_data = json.dumps(_data, sort_keys=True, indent=4)
if json.dumps(data, sort_keys=True, indent=4) == _data:
return
_data = []
for obj in data:
_data.append({
"id": obj['id'],
"symbol": obj['symbol'],
"image": obj['image'],
"name": obj['name'],
"current_price": obj['current_price'],
"price_change_percentage_24h": obj['price_change_percentage_24h']
})
api_data["crypto_rates"] = data
for queue in connected_websockets:
await queue.put({
"cmd": "crypto_rates",
"data": {
"crypto_rates": api_data["crypto_rates"]
}
})
@staticmethod
async def fiat_rates():
from fapi.factory import app, cache
fiat_rates = await FeatherApi.redis_get("fiat_rates")
if fiat_rates and app.config["DEBUG"]:
return fiat_rates
try:
result = await httpget(settings.urls["fiat_rates"], json=True)
if not result:
raise Exception("empty response")
await cache.set("fiat_rates", json.dumps(result))
return result
except Exception as ex:
app.logger.error(f"error parsing fiat_rates blob: {ex}")
# old cache
app.logger.warning("USING OLD CACHE FOR FIAT RATES")
return fiat_rates
@staticmethod
async def after_fiat(data):
from fapi.factory import app, cache, api_data, connected_websockets
if not data:
return
_data = api_data.get("fiat_rates", {})
_data = json.dumps(_data, sort_keys=True, indent=4)
if json.dumps(data, sort_keys=True, indent=4) == _data:
return
api_data["fiat_rates"] = data
for queue in connected_websockets:
await queue.put({
"cmd": "fiat_rates",
"data": {
"fiat_rates": api_data["fiat_rates"]
}
})
@staticmethod
async def ccs():
from fapi.factory import app, cache
ccs = await FeatherApi.redis_get("ccs")
if ccs and app.config["DEBUG"]:
return ccs
try:
content = await httpget(f"https://ccs.getmonero.org/index.php/projects", json=True)
data = [p for p in content["data"] if p["state"] == "FUNDING-REQUIRED" and p['address'] != '8Bok6rt3aCYE41d3YxfMfpSBD6rMDeV9cchSM99KwPFi5GHXe28pHXcYzqtej52TQJT4M8zhfyaoCXDoioR7nSfpC7St48K']
for p in data:
p.update({"url": settings.urls['ccs']+'/funding-required/'})
await cache.set("ccs", json.dumps(data))
return data
except Exception as ex:
app.logger.error(f"Error parsing CCS data: {ex}")
return ccs
@staticmethod
async def after_ccs(data):
from fapi.factory import app, cache, api_data, connected_websockets
if not data:
return
_data = api_data.get("ccs", {})
_data = json.dumps(_data, sort_keys=True, indent=4)
if json.dumps(data, sort_keys=True, indent=4) == _data:
return
api_data["ccs"] = data
for queue in connected_websockets:
await queue.put({
"cmd": "ccs",
"data": api_data["ccs"]
})
@staticmethod
async def reddit():
from fapi.factory import app, cache
reddit = await FeatherApi.redis_get("reddit")
if reddit and app.config["DEBUG"]:
return reddit
try:
blob = await httpget(settings.urls["reddit"])
if not blob:
raise Exception("no data from url")
blob = [{
'title': z['data']['title'],
'author': z['data']['author'],
'url': "https://old.reddit.com" + z['data']['permalink'],
'comments': z['data']['num_comments']
} for z in blob['data']['children']]
# success
if blob:
await cache.set("reddit", json.dumps(blob))
return blob
except Exception as ex:
app.logger.error(f"error parsing reddit blob: {ex}")
# old cache
return reddit
@staticmethod
async def after_reddit(data):
from fapi.factory import app, cache, api_data, connected_websockets
if not data:
return
_data = api_data.get("reddit", {})
_data = json.dumps(_data, sort_keys=True, indent=4)
if json.dumps(data, sort_keys=True, indent=4) == _data:
return
api_data["reddit"] = data
for queue in connected_websockets:
await queue.put({
"cmd": "reddit",
"data": api_data["reddit"]
})
@staticmethod
async def blockheight():
from fapi.factory import app, cache
data = {"mainnet": 0, "stagenet": 0}
for stagenet in [False, True]:
try:
data["mainnet" if stagenet is False else "stagenet"] = \
await BlockHeight.xmrchain(stagenet)
except Exception as ex:
app.logger.error(f"Could not fetch blockheight from xmrchain")
try:
data["mainnet" if stagenet is False else "stagenet"] = \
await BlockHeight.xmrto(stagenet)
except:
app.logger.error(f"Could not fetch blockheight from xmr.to")
return data
@staticmethod
async def after_blockheight(data):
from fapi.factory import app, cache, api_data
changed = False
api_data.setdefault("blockheights", {})
if data["mainnet"] > 1 and data["mainnet"] > api_data["blockheights"].get("mainnet", 1):
api_data["blockheights"]["mainnet"] = data["mainnet"]
changed = True
if data["stagenet"] > 1 and data["stagenet"] > api_data["blockheights"].get("stagenet", 1):
api_data["blockheights"]["stagenet"] = data["stagenet"]
changed = True
if changed:
await broadcast_blockheight()
@staticmethod
async def check_nodes():
from fapi.factory import app
nodes = await FeatherApi.redis_json_get("nodes")
data = []
for network_type, network_name in nodes.items():
for k, _nodes in nodes[network_type].items():
for node in _nodes:
timeout = aiohttp.ClientTimeout(total=5)
d = {'timeout': timeout}
if ".onion" in node:
d['connector'] = ProxyConnector.from_url(settings.tor_socks)
d['timeout'] = aiohttp.ClientTimeout(total=12)
try:
async with aiohttp.ClientSession(**d) as session:
async with session.get(f"http://{node}/get_info") as response:
blob = await response.json()
for expect in ["nettype", "height", "target_height"]:
assert expect in blob
_node = {
"address": node,
"height": int(blob["height"]),
"target_height": int(blob["target_height"]),
"online": True,
"nettype": blob["nettype"],
"type": k
}
# Filter out nodes affected by < v0.17.1.3 sybil attack
if _node['target_height'] > _node["height"]:
continue
except Exception as ex:
app.logger.warning(f"node {node} not reachable")
_node = {
"address": node,
"height": 0,
"target_height": 0,
"online": False,
"nettype": network_type,
"type": k
}
data.append(_node)
return data
@staticmethod
async def after_check_nodes(data):
from fapi.factory import api_data
api_data["nodes"] = data
await broadcast_nodes()

View file

@ -4,39 +4,36 @@
import asyncio import asyncio
import json import json
from copy import deepcopy
from quart import websocket, jsonify from quart import websocket, jsonify
from fapi.factory import app from fapi.factory import app
from fapi.wsparse import WebsocketParse from fapi.wsparse import WebsocketParse
from fapi.utils import collect_websocket from fapi.utils import collect_websocket, feather_data
@app.route("/") @app.route("/")
async def root(): async def root():
from fapi.factory import api_data data = await feather_data()
return jsonify(api_data) return jsonify(data)
@app.websocket('/ws') @app.websocket('/ws')
@collect_websocket @collect_websocket
async def ws(queue): async def ws(queue):
from fapi.factory import api_data data = await feather_data()
# blast data on connect # blast available data on connect
_api_data = deepcopy(api_data) # prevent race condition for task_key, task_value in data.items():
for k, v in _api_data.items(): if not task_value:
if not v:
continue continue
await websocket.send(json.dumps({"cmd": k, "data": v}).encode()) await websocket.send(json.dumps({"cmd": task_key, "data": task_value}).encode())
_api_data = None
async def rx(): async def rx():
while True: while True:
data = await websocket.receive() buffer = await websocket.receive()
try: try:
blob = json.loads(data) blob = json.loads(buffer)
if "cmd" not in blob: if "cmd" not in blob:
continue continue
cmd = blob.get('cmd') cmd = blob.get('cmd')

160
fapi/tasks/__init__.py Normal file
View file

@ -0,0 +1,160 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2020, The Monero Project.
# Copyright (c) 2020, dsc@xmr.pm
import json
import asyncio
import random
from typing import Union
class FeatherTask:
"""
The base class of many recurring tasks for this
project. This abstracts away some functionality:
1. Tasks are automatically cached in Redis if the `_cache_key` is set.
2. The task result is propagated to connected websocket clients if
`_websocket_cmd` is set.
3. Inheritors should implement the `task()` method.
4. Inheritors can optionally implement the `done()` method.
"""
def __init__(self, interval: int):
"""
:param interval: secs
"""
self.interval = interval
# propogate to websocket clients?
self._websocket_cmd: str = None
# redis
self._cache_key: str = None
self._cache_expiry: int = None
# logging
self._qualname: str = f"{self.__class__.__module__}.{self.__class__.__name__}"
self._active = True
self._running = False
async def start(self, *args, **kwargs):
from fapi.factory import app, connected_websockets
if not self._active:
# invalid task
return
app.logger.info(f"Starting task {self._qualname}")
sleep = lambda: asyncio.sleep(random.randrange(self.interval - 5,
self.interval + 5))
while True:
if not self._active:
# invalid task
return
if self._running:
# task already running, wait for completion
await asyncio.sleep(5)
continue
try:
self._running = True
result: dict = await self.task(*args, **kwargs)
if not result:
raise Exception("No result")
except Exception as ex:
app.logger.error(f"{self._qualname} - {ex}")
# if the task failed we can attempt to use an old value from the cache.
if not self._cache_key:
app.logger.warning(f"{self._qualname} - No cache key for task, skipping")
await sleep()
self._running = False
continue
app.logger.info(f"{self._qualname} - trying cache")
result = await self.cache_get(self._cache_key)
if result:
app.logger.warning(f"serving cached result for {self._qualname}")
else:
app.logger.error(f"{self._qualname} - cache lookup failed, fix me")
await sleep()
self._running = False
continue
# optional: propogate result to websocket peers
if self._websocket_cmd and result:
# but only when there is a change
normalize = lambda k: json.dumps(k, sort_keys=True, indent=4)
propagate = True
cached = await self.cache_get(self._cache_key)
if cached:
if normalize(cached) == normalize(result):
propagate = False
if propagate:
for queue in connected_websockets:
await queue.put({
"cmd": self._websocket_cmd,
"data": {
self._websocket_cmd: result
}
})
# optional: cache the result
if self._cache_key and result:
await self.cache_set(self._cache_key, result, self._cache_expiry)
# optional: call completion function
if 'done' in self.__class__.__dict__:
await self.done(result)
await sleep()
self._running = False
async def task(self, *args, **kwargs):
raise NotImplementedError()
async def done(self, *args, **kwargs):
"""overload this method to execute this function after
completion of `task`. Results from `task` are parameters
for `done`."""
raise NotImplementedError()
async def end(self, result: dict):
raise NotImplementedError()
async def cache_get(self, key: str) -> dict:
from fapi.factory import app, cache
try:
data = await cache.get(key)
if not data:
return {}
return json.loads(data)
except Exception as ex:
app.logger.error(f"Redis GET error with key '{key}': {ex}")
async def cache_set(self, key, val: Union[dict, int], expiry: int = 0) -> bool:
from fapi.factory import app, cache
try:
data = json.dumps(val)
if isinstance(expiry, int) and expiry > 0:
await cache.setex(key, expiry, data)
else:
await cache.set(key, data)
return True
except Exception as ex:
app.logger.error(f"Redis SET error with key '{key}': {ex}")
from fapi.tasks.proposals import FundingProposalsTask
from fapi.tasks.historical_prices import HistoricalPriceTask
from fapi.tasks.blockheight import BlockheightTask
from fapi.tasks.rates_fiat import FiatRatesTask
from fapi.tasks.rates_crypto import CryptoRatesTask
from fapi.tasks.reddit import RedditTask
from fapi.tasks.rpc_nodes import RPCNodeCheckTask
from fapi.tasks.xmrig import XmrigTask
from fapi.tasks.xmrto import XmrToTask

161
fapi/tasks/blockheight.py Normal file
View file

@ -0,0 +1,161 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2020, The Monero Project.
# Copyright (c) 2020, dsc@xmr.pm
import re
from typing import Union
from collections import Counter
from functools import partial
import settings
from fapi.utils import httpget, popularity_contest
from fapi.tasks import FeatherTask
class BlockheightTask(FeatherTask):
"""
Fetch latest blockheight using webcrawling. We pick the most popular
height from a list of websites. Arguably this approach has benefits
over querying a (local) Monero RPC instance, as that requires
maintenance, while this solution assumes that (at least) 2 websites
reports the correct height.
"""
def __init__(self, interval: int = 60):
super(BlockheightTask, self).__init__(interval)
self._cache_key = "blockheights"
self._cache_expiry = 90
self._websocket_cmd = "blockheights"
self._fns = {
"xmr": {
"mainnet": [
self._blockchair,
partial(self._onion_explorer, url="https://xmrchain.net/"),
partial(self._onion_explorer, url="https://community.xmr.to/explorer/mainnet/"),
partial(self._onion_explorer, url="https://monero.exan.tech/")
],
"stagenet": [
partial(self._onion_explorer, url="https://stagenet.xmrchain.net/"),
partial(self._onion_explorer, url="https://community.xmr.to/explorer/stagenet/"),
partial(self._onion_explorer, url="https://monero-stagenet.exan.tech/")
]
},
"wow": {
"mainnet": [
partial(self._onion_explorer, url="https://explore.wownero.com/"),
]
},
"aeon": {
"mainnet": [
partial(self._onion_explorer, url="https://aeonblockexplorer.com/"),
],
"stagenet": [
partial(self._onion_explorer, url="http://162.210.173.151:8083/"),
]
},
"trtl": {
"mainnet": [
self._turtlenode,
self._turtlenetwork,
self._l33d4n
]
},
"xhv": {
"mainnet": [
partial(self._onion_explorer, url="https://explorer.havenprotocol.org/")
],
"stagenet": [
partial(self._onion_explorer, url="https://explorer.stagenet.havenprotocol.org/page/1")
]
},
"loki": {
"mainnet": [
partial(self._onion_explorer, url="https://lokiblocks.com/")
],
"testnet": [
partial(self._onion_explorer, url="https://lokitestnet.com/")
]
}
}
async def task(self) -> Union[dict, None]:
from fapi.factory import app
coin_network_types = ["mainnet", "stagenet", "testnet"]
data = {t: 0 for t in coin_network_types}
for coin_network_type in coin_network_types:
if coin_network_type not in self._fns[settings.COIN_SYMBOL]:
continue
heights = []
for fn in self._fns[settings.COIN_SYMBOL][coin_network_type]:
fn_name = fn.func.__name__ if isinstance(fn, partial) else fn.__name__
try:
result = await fn()
heights.append(result)
except Exception as ex:
app.logger.error(f"blockheight fetch failed from {fn_name}(): {ex}")
continue
if heights:
data[coin_network_type] = popularity_contest(heights)
if data["mainnet"] == 0: # only care about mainnet
app.logger.error(f"Failed to parse latest blockheight!")
return
return data
async def _blockchair(self) -> int:
re_blockheight = r"<a href=\".*\">(\d+)</a>"
url = "https://blockchair.com/monero"
content = await httpget(url, json=False, raise_for_status=True)
height = re.findall(re_blockheight, content)
height = max(map(int, height))
return height
async def _wownero(self) -> int:
url = "https://explore.wownero.com/"
return await BlockheightTask._onion_explorer(url)
async def _turtlenode(self) -> int:
url = "https://public.turtlenode.net/info"
blob = await httpget(url, json=True, raise_for_status=True)
height = int(blob.get("height", 0))
if height <= 0:
raise Exception("bad height")
return height
async def _turtlenetwork(self) -> int:
url = "https://tnnode2.turtlenetwork.eu/blocks/height"
blob = await httpget(url, json=True, raise_for_status=True)
height = int(blob.get("height", 0))
if height <= 0:
raise Exception("bad height")
return height
async def _l33d4n(self):
url = "https://blockapi.turtlepay.io/block/header/top"
blob = await httpget(url, json=True, raise_for_status=True)
height = int(blob.get("height", 0))
if height <= 0:
raise Exception("bad height")
return height
@staticmethod
async def _onion_explorer(url):
"""
Pages that are based on:
https://github.com/moneroexamples/onion-monero-blockchain-explorer
"""
re_blockheight = r"block\/(\d+)\"\>"
content = await httpget(url, json=False)
height = re.findall(re_blockheight, content)
height = max(map(int, height))
return height

View file

@ -0,0 +1,113 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2020, The Monero Project.
# Copyright (c) 2020, dsc@xmr.pm
import os
import json
from typing import List, Union
from datetime import datetime
import aiofiles
import settings
from fapi.utils import httpget
from fapi.tasks import FeatherTask
class HistoricalPriceTask(FeatherTask):
"""
This class manages a historical price (USD) database, saved in a
textfile at `self._path`. A Feather wallet instance will ask
for the historical fiat price database on startup (but only
in chunks of a month for anti-fingerprinting reasons).
The task in this class simply keeps the fiat database
up-to-date locally.
"""
def __init__(self, interval: int = 43200):
super(HistoricalPriceTask, self).__init__(interval)
self._cache_key = f"historical_fiat"
self._path = f"data/historical_prices_{settings.COIN_SYMBOL}.json"
self._http_endpoint = f"https://www.coingecko.com/price_charts/{settings.COIN_NAME}/usd/max.json"
self._year_genesis = int(settings.COIN_GENESIS_DATE[:4])
self._load()
async def task(self) -> Union[dict, None]:
content = await httpget(self._http_endpoint, json=True, raise_for_status=False)
if "stats" not in content:
raise Exception()
stats: List[List] = content.get('stats', []) # [[timestamp,USD],]
if not stats:
return
data = {
year: {
month: {} for month in range(1, 13)
} for year in range(self._year_genesis, datetime.now().year + 1)
}
# timestamp:USD
daily_price_blob = {day[0]: day[1] for day in stats}
# normalize
for timestamp, usd in daily_price_blob.items():
_date = datetime.fromtimestamp(timestamp / 1000)
data[_date.year].setdefault(_date.month, {})
data[_date.year][_date.month][_date.day] = usd
# update local database
await self._write(data)
return data
async def _load(self) -> None:
if not os.path.exists(self._path):
return
async with aiofiles.open(self._path, mode="r") as f:
content = await f.read()
blob = json.loads(content)
# ¯\_(ツ)_/¯
blob = {int(k): {
int(_k): {
int(__k): __v for __k, __v in _v.items()
} for _k, _v in v.items()
} for k, v in blob.items()}
await self.cache_set(self._cache_key, blob)
async def _write(self, blob: dict) -> None:
data = json.dumps(blob, sort_keys=True, indent=4)
async with aiofiles.open(self._path, mode="w") as f:
await f.write(data)
@staticmethod
async def get(year: int, month: int = None) -> Union[dict, None]:
"""This function is called when a Feather wallet client asks
for (a range of) historical fiat information. It returns the
data filtered by the parameters."""
from fapi.factory import cache
blob = await cache.get("historical_fiat")
blob = json.loads(blob)
if year not in blob:
return
rtn = {}
if not month:
for _m, days in blob[year].items():
for day, price in days.items():
rtn[datetime(year, _m, day).strftime('%Y%m%d')] = price
return rtn
if month not in blob[year]:
return
for day, price in blob[year][month].items():
rtn[datetime(year, month, day).strftime('%Y%m%d')] = price
return rtn

138
fapi/tasks/proposals.py Normal file
View file

@ -0,0 +1,138 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2020, The Monero Project.
# Copyright (c) 2020, dsc@xmr.pm
from bs4 import BeautifulSoup
from typing import List
import settings
from fapi.utils import httpget
from fapi.tasks import FeatherTask
class FundingProposalsTask(FeatherTask):
"""Fetch funding proposals made by the community."""
def __init__(self, interval: int = 600):
from fapi.factory import app
super(FundingProposalsTask, self).__init__(interval)
self._cache_key = "funding_proposals"
self._cache_expiry = self.interval * 1000
# url
self._http_endpoints = {
"xmr": "https://ccs.getmonero.org",
"wow": "https://funding.wownero.com"
}
if settings.COIN_SYMBOL not in self._http_endpoints:
app.logger.warning(f"Missing proposal URL for {settings.COIN_SYMBOL.upper()}, ignoring update task")
self._active = False
self._http_endpoint = self._http_endpoints[settings.COIN_SYMBOL]
if self._http_endpoint.endswith("/"):
self._http_endpoint = self._http_endpoint[:-1]
# websocket
self._websocket_cmd = "funding_proposals"
self._websocket_cmds = {
"xmr": "ccs",
"wow": "wfs"
}
if settings.COIN_SYMBOL not in self._websocket_cmds:
app.logger.warning(f"Missing websocket cmd for {settings.COIN_SYMBOL.upper()}, ignoring update task")
self._active = False
self._websocket_cmd = self._websocket_cmds[settings.COIN_SYMBOL]
async def task(self):
if settings.COIN_SYMBOL == "xmr":
return await self._xmr()
elif settings.COIN_SYMBOL == "wow":
return await self._wfs()
async def _xmr(self) -> List[dict]:
# CCS API is lacking;
# - API returns more `FUNDING-REQUIRED` proposals than there are on the website
# - API does not allow filtering
# - API sometimes breaks; https://hackerone.com/reports/934231
# we'll web scrape instead
from fapi.factory import app
content = await httpget(f"{self._http_endpoint}/funding-required/", json=False)
soup = BeautifulSoup(content, "html.parser")
listings = []
for listing in soup.findAll("a", {"class": "ffs-idea"}):
try:
item = {
"state": "FUNDING-REQUIRED",
"author": listing.find("p", {"class": "author-list"}).text,
"date": listing.find("p", {"class": "date-list"}).text,
"title": listing.find("h3").text,
"raised_amount": float(listing.find("span", {"class": "progress-number-funded"}).text),
"target_amount": float(listing.find("span", {"class": "progress-number-goal"}).text),
"contributors": 0,
"url": f"{self._http_endpoint}{listing.attrs['href']}"
}
item["percentage_funded"] = item["raised_amount"] * (100 / item["target_amount"])
if item["percentage_funded"] >= 100:
item["percentage_funded"] = 100.0
try:
item["contributors"] = int(listing.find("p", {"class": "contributor"}).text.split(" ")[0])
except:
pass
href = listing.attrs['href']
try:
content = await httpget(f"{self._http_endpoint}{href}", json=False)
try:
soup2 = BeautifulSoup(content, "html.parser")
except Exception as ex:
app.logger.error(f"error parsing ccs HTML page: {ex}")
continue
try:
instructions = soup2.find("div", {"class": "instructions"})
if not instructions:
raise Exception("could not parse div.instructions, page probably broken")
address = instructions.find("p", {"class": "string"}).text
if not address.strip():
raise Exception(f"error fetching ccs HTML: could not parse address")
item["address"] = address.strip()
except Exception as ex:
app.logger.error(f"error parsing ccs address from HTML: {ex}")
continue
except Exception as ex:
app.logger.error(f"error fetching ccs HTML: {ex}")
continue
listings.append(item)
except Exception as ex:
app.logger.error(f"error parsing a ccs item: {ex}")
return listings
async def _wfs(self) -> List[dict]:
"""https://git.wownero.com/wownero/wownero-funding-system"""
blob = await httpget(f"{self._http_endpoint}/api/1/proposals?offset=0&limit=10&status=2", json=True)
if "data" not in blob:
raise Exception("invalid json response")
listings = []
for p in blob['data']:
item = {
"address": p["addr_donation"],
"url": f"{self._http_endpoint}/proposal/{p['id']}",
"state": "FUNDING-REQUIRED",
"date": p['date_posted'],
"title": p['headline'],
'target_amount': p['funds_target'],
'raised_amount': round(p['funds_target'] / 100 * p['funded_pct'], 2),
'contributors': 0,
'percentage_funded': round(p['funded_pct'], 2),
'author': p['user']
}
listings.append(item)
return listings

View file

@ -0,0 +1,59 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2020, The Monero Project.
# Copyright (c) 2020, dsc@xmr.pm
from typing import List, Union
import settings
from fapi.utils import httpget
from fapi.tasks import FeatherTask
class CryptoRatesTask(FeatherTask):
def __init__(self, interval: int = 180):
super(CryptoRatesTask, self).__init__(interval)
self._cache_key = "crypto_rates"
self._cache_expiry = self.interval * 10
self._websocket_cmd = "crypto_rates"
self._http_api_gecko = "https://api.coingecko.com/api/v3"
async def task(self) -> Union[List[dict], None]:
"""Fetch USD prices for various coins"""
from fapi.factory import app
url = f"{self._http_api_gecko}/coins/markets?vs_currency=usd"
rates = await httpget(url, json=True)
# normalize object, too many useless keys
rates = [{
"id": r["id"],
"symbol": r["symbol"],
"image": r["image"],
"name": r["name"],
"current_price": r["current_price"],
"price_change_percentage_24h": r["price_change_percentage_24h"]
} for r in rates]
# additional coins as defined by `settings.CRYPTO_RATES_COINS_EXTRA`
for coin, symbol in settings.CRYPTO_RATES_COINS_EXTRA.items():
url = f"{self._http_api_gecko}/simple/price?ids={coin}&vs_currencies=usd"
try:
data = await httpget(url, json=True)
if coin not in data or "usd" not in data[coin]:
continue
rates.append({
"id": coin,
"symbol": symbol,
"image": "",
"name": coin.capitalize(),
"current_price": data[coin]["usd"],
"price_change_percentage_24h": 0.0
})
except Exception as ex:
app.logger.error(f"extra coin: {coin}; {ex}")
return rates

23
fapi/tasks/rates_fiat.py Normal file
View file

@ -0,0 +1,23 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2020, The Monero Project.
# Copyright (c) 2020, dsc@xmr.pm
from fapi.utils import httpget
from fapi.tasks import FeatherTask
class FiatRatesTask(FeatherTask):
def __init__(self, interval: int = 600):
super(FiatRatesTask, self).__init__(interval)
self._cache_key = "fiat_rates"
self._cache_expiry = self.interval * 10
self._websocket_cmd = "fiat_rates"
self._http_endpoint = "https://api.exchangeratesapi.io/latest?base=USD"
async def task(self):
"""Fetch fiat rates"""
result = await httpget(self._http_endpoint, json=True)
return result

56
fapi/tasks/reddit.py Normal file
View file

@ -0,0 +1,56 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2020, The Monero Project.
# Copyright (c) 2020, dsc@xmr.pm
import settings
from fapi.utils import httpget
from fapi.tasks import FeatherTask
class RedditTask(FeatherTask):
def __init__(self, interval: int = 900):
from fapi.factory import app
super(RedditTask, self).__init__(interval)
self._cache_key = "reddit"
self._cache_expiry = self.interval * 10
self._websocket_cmd = "reddit"
self._http_endpoints = {
"xmr": "https://www.reddit.com/r/monero",
"wow": "https://www.reddit.com/r/wownero",
"aeon": "https://www.reddit.com/r/aeon",
"trtl": "https://www.reddit.com/r/TRTL",
"xhv": "https://www.reddit.com/r/havenprotocol",
"loki": "https://www.reddit.com/r/LokiProject"
}
if settings.COIN_SYMBOL not in self._http_endpoints:
app.logger.warning(f"Missing Reddit URL for {settings.COIN_SYMBOL.upper()}, ignoring update task")
self._active = False
self._http_endpoint = self._http_endpoints[settings.COIN_SYMBOL]
if self._http_endpoint.endswith("/"):
self._http_endpoint = self._http_endpoint[:-1]
async def task(self):
from fapi.factory import app
url = f"{self._http_endpoint}/new.json?limit=15"
try:
blob = await httpget(url, json=True, raise_for_status=True)
except Exception as ex:
app.logger.error(f"failed fetching '{url}' {ex}")
raise
blob = [{
'title': z['data']['title'],
'author': z['data']['author'],
'url': "https://old.reddit.com" + z['data']['permalink'],
'comments': z['data']['num_comments']
} for z in blob['data']['children']]
if not blob:
raise Exception("no content")
return blob

117
fapi/tasks/rpc_nodes.py Normal file
View file

@ -0,0 +1,117 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2020, The Monero Project.
# Copyright (c) 2020, dsc@xmr.pm
import json
from typing import List
import settings
from fapi.utils import httpget, popularity_contest
from fapi.tasks import FeatherTask
class RPCNodeCheckTask(FeatherTask):
def __init__(self, interval: int = 60):
super(RPCNodeCheckTask, self).__init__(interval)
self._cache_key = "rpc_nodes"
self._cache_expiry = None
self._websocket_cmd = "nodes"
self._http_timeout = 5
self._http_timeout_onion = 10
async def task(self) -> List[dict]:
"""Check RPC nodes status"""
from fapi.factory import app, rpc_nodes, cache
try:
heights = json.loads(await cache.get("blockheights"))
except:
heights = {}
nodes = []
for network_type_coin, _ in rpc_nodes.items():
data = []
for network_type, _nodes in _.items():
for node in _nodes:
try:
blob = await self.node_check(node, network_type=network_type)
data.append(blob)
except Exception as ex:
app.logger.warning(f"node {node} not reachable; {ex}")
data.append(self._bad_node(**{
"address": node,
"nettype": network_type_coin,
"type": network_type,
"height": 0
}))
# not neccesary for stagenet/testnet nodes to be validated
if network_type_coin != "mainnet":
nodes += data
continue
if not data:
continue
# Filter out nodes affected by < v0.17.1.3 sybil attack
data = list(map(lambda node: node if node['target_height'] <= node['height']
else self._bad_node(**node), data))
allowed_offset = 3
valid_heights = []
current_blockheight = heights.get(network_type_coin, 0)
if isinstance(current_blockheight, int) and current_blockheight > 0:
# blockheight from cache has precedence
valid_heights = range(current_blockheight, current_blockheight - allowed_offset, -1)
else:
# popularity contest
common_height = popularity_contest([z['height'] for z in data])
valid_heights = range(common_height, common_height - allowed_offset, -1)
data = list(map(lambda node: node if node['height'] in valid_heights
else self._bad_node(**node), data))
nodes += data
return nodes
async def node_check(self, node, network_type: str) -> dict:
"""Call /get_info on the RPC, return JSON"""
opts = {
"timeout": self._http_timeout,
"json": True
}
if network_type == "tor":
opts["socks5"] = settings.TOR_SOCKS_PROXY
opts["timeout"] = self._http_timeout_onion
blob = await httpget(f"http://{node}/get_info", **opts)
for expect in ["nettype", "height", "target_height"]:
if expect not in blob:
raise Exception(f"Invalid JSON response from RPC; expected key '{expect}'")
height = int(blob.get("height", 0))
target_height = int(blob.get("target_height", 0))
return {
"address": node,
"height": height,
"target_height": target_height,
"online": True,
"nettype": blob["nettype"],
"type": network_type
}
def _bad_node(self, **kwargs):
return {
"address": kwargs['address'],
"height": kwargs['height'],
"target_height": 0,
"online": False,
"nettype": kwargs['nettype'],
"type": kwargs['type']
}

58
fapi/tasks/xmrig.py Normal file
View file

@ -0,0 +1,58 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2020, The Monero Project.
# Copyright (c) 2020, dsc@xmr.pm
from dateutil.parser import parse
import settings
from fapi.utils import httpget
from fapi.tasks import FeatherTask
class XmrigTask(FeatherTask):
"""Fetches the latest XMRig releases using Github's API"""
def __init__(self, interval: int = 43200):
super(XmrigTask, self).__init__(interval)
self._cache_key = "xmrig"
self._cache_expiry = self.interval * 10
self._websocket_cmd = "xmrig"
self._http_endpoint = "https://api.github.com/repos/xmrig/xmrig/releases"
async def task(self) -> dict:
blob = await httpget(self._http_endpoint)
if not isinstance(blob, list) or not blob:
raise Exception(f"Invalid JSON response for {self._http_endpoint}")
blob = blob[0]
# only uploaded assets
assets = list(filter(lambda k: k['state'] == 'uploaded', blob['assets']))
# only archives
assets = list(filter(lambda k: k['name'].endswith(('.tar.gz', '.zip')), assets))
version = blob['tag_name']
data = {}
# sort by OS
for asset in assets:
operating_system = "linux"
if "msvc" in asset['name'] or "win64" in asset['name']:
operating_system = "windows"
elif "macos" in asset["name"]:
operating_system = "macos"
data.setdefault(operating_system, [])
data[operating_system].append({
"name": asset["name"],
"created_at": parse(asset["created_at"]).strftime("%Y-%m-%d"),
"url": f"https://github.com/xmrig/xmrig/releases/download/{version}/{asset['name']}",
"download_count": int(asset["download_count"])
})
return {
"version": version,
"assets": data
}

26
fapi/tasks/xmrto.py Normal file
View file

@ -0,0 +1,26 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2020, The Monero Project.
# Copyright (c) 2020, dsc@xmr.pm
import settings
from fapi.utils import httpget
from fapi.tasks import FeatherTask
class XmrToTask(FeatherTask):
def __init__(self, interval: int = 30):
super(XmrToTask, self).__init__(interval)
self._cache_key = "xmrto_rates"
self._cache_expiry = self.interval * 10
if settings.COIN_MODE == 'stagenet':
self._http_endpoint = "https://test.xmr.to/api/v3/xmr2btc/order_parameter_query/"
else:
self._http_endpoint = "https://xmr.to/api/v3/xmr2btc/order_parameter_query/"
async def task(self):
result = await httpget(self._http_endpoint)
if "error" in result:
raise Exception(f"${result['error']} ${result['error_msg']}")
return result

View file

@ -2,55 +2,34 @@
# Copyright (c) 2020, The Monero Project. # Copyright (c) 2020, The Monero Project.
# Copyright (c) 2020, dsc@xmr.pm # Copyright (c) 2020, dsc@xmr.pm
import asyncio
import json import json
import asyncio
import os import os
import re
import random import random
from functools import wraps
from datetime import datetime from datetime import datetime
from collections import Counter
from functools import wraps
from typing import List, Union
import psutil
import aiohttp import aiohttp
from aiohttp_socks import ProxyConnector
import settings import settings
class BlockHeight: def print_banner():
@staticmethod print(f"""\033[91m
async def xmrchain(stagenet: bool = False):
re_blockheight = r"block\/(\d+)\"\>"
url = "https://stagenet.xmrchain.net/" if stagenet else "https://xmrchain.net/"
content = await httpget(url, json=False)
xmrchain = re.findall(re_blockheight, content)
current = max(map(int, xmrchain))
return current
@staticmethod {settings.COIN_SYMBOL}\033[0m
async def xmrto(stagenet: bool = False): """.strip())
re_blockheight = r"block\/(\d+)\"\>"
url = "https://community.xmr.to/explorer/stagenet/" if stagenet else "https://community.xmr.to/explorer/mainnet/"
content = await httpget(url, json=False)
xmrchain = re.findall(re_blockheight, content)
current = max(map(int, xmrchain))
return current
async def loopyloop(secs: int, func, after_func=None):
"""
asyncio loop
:param secs: interval
:param func: function to execute
:param after_func: function to execute after completion
:return:
"""
while True:
result = await func()
if after_func:
await after_func(result)
# randomize a bit for Tor anti fingerprint reasons
_secs = random.randrange(secs - 5, secs +5)
await asyncio.sleep(_secs)
def collect_websocket(func): def collect_websocket(func):
@ -66,32 +45,21 @@ def collect_websocket(func):
return wrapper return wrapper
async def broadcast_blockheight(): async def httpget(url: str, json=True, timeout: int = 5, socks5: str = None, raise_for_status=True):
from fapi.factory import connected_websockets, api_data
for queue in connected_websockets:
await queue.put({
"cmd": "blockheights",
"data": {
"height": api_data.get("blockheights", {})
}
})
async def broadcast_nodes():
from fapi.factory import connected_websockets, api_data
for queue in connected_websockets:
await queue.put({
"cmd": "nodes",
"data": api_data['nodes']
})
async def httpget(url: str, json=True):
timeout = aiohttp.ClientTimeout(total=30)
headers = {"User-Agent": random_agent()} headers = {"User-Agent": random_agent()}
async with aiohttp.ClientSession(timeout=timeout) as session: opts = {"timeout": aiohttp.ClientTimeout(total=timeout)}
if socks5:
opts['connector'] = ProxyConnector.from_url(socks5)
async with aiohttp.ClientSession(**opts) as session:
async with session.get(url, headers=headers) as response: async with session.get(url, headers=headers) as response:
return await response.json() if json else await response.text() if raise_for_status:
response.raise_for_status()
result = await response.json() if json else await response.text()
if result is None or (isinstance(result, str) and result == ''):
raise Exception("empty response from request")
return result
def random_agent(): def random_agent():
@ -99,129 +67,64 @@ def random_agent():
return random.choice(user_agents) return random.choice(user_agents)
class TxFiatDb: async def feather_data():
# historical fiat price db for given symbol """A collection of data collected by
def __init__(self, symbol, block_date_start): `FeatherTask`, for Feather wallet clients."""
self.fn = "data/fiatdb" from fapi.factory import cache, now
self.symbol = symbol data = await cache.get("data")
self.block_start = block_date_start if data:
self._url = "https://www.coingecko.com/price_charts/69/usd/max.json"
self.data = {}
self.load()
def get(self, year: int, month: int = None):
rtn = {}
if year not in self.data:
return
if not month:
for _m, days in self.data[year].items():
for day, price in days.items():
rtn[datetime(year, _m, day).strftime('%Y%m%d')] = price
return rtn
if month not in self.data[year]:
return
for day, price in self.data[year][month].items():
rtn[datetime(year, month, day).strftime('%Y%m%d')] = price
return rtn
def load(self):
if not os.path.exists("fiatdb"):
return {}
f = open("fiatdb", "r")
data = f.read()
f.close()
data = json.loads(data) data = json.loads(data)
return data
# whatever keys = ["blockheights", "funding_proposals", "crypto_rates", "fiat_rates", "reddit", "rpc_nodes", "xmrig", "xmrto_rates"]
self.data = {int(k): {int(_k): {int(__k): __v for __k, __v in _v.items()} for _k, _v in v.items()} for k, v in data.items()} data = {keys[i]: json.loads(val) if val else None for i, val in enumerate(await cache.mget(*keys))}
def write(self): # @TODO: for backward-compat reasons we're including some legacy keys which can be removed after 1.0 release
f = open("fiatdb", "w") data['nodes'] = data['rpc_nodes']
f.write(json.dumps(self.data)) data['ccs'] = data['funding_proposals']
f.close() data['wfs'] = data['funding_proposals']
async def update(self): # start caching when application lifetime is more than 20 seconds
try: if (datetime.now() - now).total_seconds() > 20:
content = await httpget(self._url, json=True) await cache.setex("data", 30, json.dumps(data))
if not "stats" in content: return data
raise Exception()
except Exception as ex:
return
stats = content.get('stats')
if not stats:
return
year_start = int(self.block_start[:4])
self.data = {z: {k: {} for k in range(1, 13)}
for z in range(year_start, datetime.now().year + 1)}
content = {z[0]: z[1] for z in stats}
for k, v in content.items():
_date = datetime.fromtimestamp(k / 1000)
self.data[_date.year].setdefault(_date.month, {})
self.data[_date.year][_date.month][_date.day] = v
self.write()
class XmrRig: def popularity_contest(lst: List[int]) -> Union[int, None]:
@staticmethod """Return most common occurrences of List[int]. If
async def releases(): there are no duplicates, return max() instead.
from fapi.factory import app, cache """
from fapi.fapi import FeatherApi if not lst:
return
if len(set(lst)) == len(lst):
return max(lst)
return Counter(lst).most_common(1)[0][0]
blob = await FeatherApi.redis_get("xmrig_releases")
if blob and app.config["DEBUG"]:
return blob
try: def current_worker_thread_is_primary() -> bool:
result = await httpget(settings.urls["xmrig"]) """
if not isinstance(result, list): ASGI server (Hypercorn) may start multiple
raise Exception("JSON response was not a list") worker threads, but we only want one feather-ws
if len(result) <= 1: instance to schedule `FeatherTask` tasks at an
raise Exception("JSON response list was 1 or less") interval. Therefor this function determines if the
result = result[0] current instance is responsible for the
await cache.set("xmrig_releases", json.dumps(result)) recurring Feather tasks.
blob = result """
except Exception as ex: from fapi.factory import app
app.logger.error(f"error parsing xmrig blob: {ex}")
if blob:
app.logger.warning(f"passing xmrig output from cache")
return blob
return blob current_pid = os.getpid()
parent_pid = os.getppid()
app.logger.debug(f"current_pid: {current_pid}, "
f"parent_pid: {parent_pid}")
@staticmethod if parent_pid == 0:
async def after_releases(data): return True
from fapi.factory import api_data
from dateutil.parser import parse
_data = []
for asset in data['assets']:
for expected in ["tar.gz", ".zip"]:
if asset["state"] != "uploaded":
continue
if asset["name"].endswith(expected):
_data.append(asset)
version = data['tag_name']
assets = {}
for asset in _data: parent = psutil.Process(parent_pid)
operating_system = "linux" if parent.name() != "hypercorn":
if "msvc" in asset['name'] or "win64" in asset['name']: return True
operating_system = "windows"
elif "macos" in asset["name"]:
operating_system = "macos"
assets.setdefault(operating_system, []) lowest_pid = min(c.pid for c in parent.children(recursive=True) if c.name() == "hypercorn")
assets[operating_system].append({ if current_pid == lowest_pid:
"name": asset["name"], return True
"created_at": parse(asset["created_at"]).strftime("%Y-%m-%d"),
"url": f"https://github.com/xmrig/xmrig/releases/download/{version}/{asset['name']}",
"download_count": int(asset["download_count"])
})
api_data["xmrig"] = {
"version": version,
"assets": assets
}

View file

@ -21,5 +21,5 @@ class WebsocketParse:
year = data.get('year') year = data.get('year')
month = data.get('month') month = data.get('month')
from fapi.factory import txfiatdb from fapi.tasks.historical_prices import HistoricalPriceTask
return txfiatdb.get(year, month) return await HistoricalPriceTask.get(year, month)

View file

@ -1,7 +1,9 @@
quart quart
aioredis aioredis
aiohttp aiohttp
aiofiles
quart_session quart_session
beautifulsoup4 beautifulsoup4
aiohttp_socks aiohttp_socks
python-dateutil python-dateutil
psutil

2
run.py
View file

@ -6,4 +6,4 @@ from fapi.factory import create_app
import settings import settings
app = create_app() app = create_app()
app.run(settings.host, port=settings.port, debug=settings.debug, use_reloader=False) app.run(settings.HOST, port=settings.PORT, debug=settings.DEBUG, use_reloader=False)

View file

@ -2,29 +2,33 @@
# Copyright (c) 2020, The Monero Project. # Copyright (c) 2020, The Monero Project.
# Copyright (c) 2020, dsc@xmr.pm # Copyright (c) 2020, dsc@xmr.pm
debug = False import os
host = "127.0.0.1" cwd = os.path.dirname(os.path.realpath(__file__))
port = 1337
redis_password = None
redis_address = "redis://localhost"
tor_socks = "socks5://127.0.0.1:9050"
rpc_url = "http://127.0.0.1:18089"
xmrchain = "https://stagenet.xmrchain.net"
crypto_name = "monero"
crypto_symbol = "xmr"
crypto_block_date_start = "20140418"
urls = { def bool_env(val):
"xmrig": "https://api.github.com/repos/xmrig/xmrig/releases", return val is True or (isinstance(val, str) and (val.lower() == 'true' or val == '1'))
"reddit": "https://www.reddit.com/r/monero/top.json?limit=100",
"ccs": "https://ccs.getmonero.org",
"fiat_rates": "https://api.exchangeratesapi.io/latest?base=USD", DEBUG = bool_env(os.environ.get("FEATHER_DEBUG", False))
"crypto_rates": "https://api.coingecko.com/api/v3/coins/markets?vs_currency=usd", HOST = os.environ.get("FEATHER_HOST", "127.0.0.1")
"crypto_wow_rates": "https://api.coingecko.com/api/v3/simple/price?ids=wownero&vs_currencies=usd" PORT = int(os.environ.get("FEATHER_PORT", 1337))
REDIS_ADDRESS = os.environ.get("FEATHER_REDIS_ADDRESS", "redis://localhost")
REDIS_PASSWORD = os.environ.get("FEATHER_REDIS_PASSWORD")
COIN_NAME = os.environ.get("FEATHER_COIN_NAME", "monero").lower() # as per coingecko
COIN_SYMBOL = os.environ.get("FEATHER_COIN_SYMBOL", "xmr").lower() # as per coingecko
COIN_GENESIS_DATE = os.environ.get("FEATHER_COIN_GENESIS_DATE", "20140418")
COIN_MODE = os.environ.get("FEATHER_COIN_MODE", "mainnet").lower()
TOR_SOCKS_PROXY = os.environ.get("FEATHER_TOR_SOCKS_PROXY", "socks5://127.0.0.1:9050")
# while fetching USD price from coingecko, also include these extra coins:
CRYPTO_RATES_COINS_EXTRA = {
"wownero": "wow",
"aeon": "aeon",
"turtlecoin": "trtl",
"haven": "xhv",
"loki": "loki"
} }
if debug:
urls["xmrto_rates"] = "https://test.xmr.to/api/v3/xmr2btc/order_parameter_query/"
else:
urls["xmrto_rates"] = "https://xmr.to/api/v3/xmr2btc/order_parameter_query/"