wowlet-backend/fapi/utils.py
2020-10-03 15:40:18 +02:00

163 lines
4.9 KiB
Python

# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2020, The Monero Project.
# Copyright (c) 2020, dsc@xmr.pm
import asyncio
import json
import os
import re
import random
from functools import wraps
from datetime import datetime
import aiohttp
class BlockHeight:
@staticmethod
async def xmrchain(stagenet: bool = False):
re_blockheight = r"block\/(\d+)\"\>"
url = "https://stagenet.xmrchain.net/" if stagenet else "https://xmrchain.net/"
content = await httpget(url, json=False)
xmrchain = re.findall(re_blockheight, content)
current = max(map(int, xmrchain))
return current
@staticmethod
async def xmrto(stagenet: bool = False):
re_blockheight = r"block\/(\d+)\"\>"
url = "https://community.xmr.to/explorer/stagenet/" if stagenet else "https://community.xmr.to/explorer/mainnet/"
content = await httpget(url, json=False)
xmrchain = re.findall(re_blockheight, content)
current = max(map(int, xmrchain))
return current
async def loopyloop(secs: int, func, after_func=None):
"""
asyncio loop
:param secs: interval
:param func: function to execute
:param after_func: function to execute after completion
:return:
"""
while True:
result = await func()
if after_func:
await after_func(result)
# randomize a bit for Tor anti fingerprint reasons
_secs = random.randrange(secs - 5, secs +5)
await asyncio.sleep(_secs)
def collect_websocket(func):
@wraps(func)
async def wrapper(*args, **kwargs):
from fapi.factory import connected_websockets
queue = asyncio.Queue()
connected_websockets.add(queue)
try:
return await func(queue, *args, **kwargs)
finally:
connected_websockets.remove(queue)
return wrapper
async def broadcast_blockheight():
from fapi.factory import connected_websockets, api_data
for queue in connected_websockets:
await queue.put({
"cmd": "blockheights",
"data": {
"height": api_data.get("blockheights", {})
}
})
async def broadcast_nodes():
from fapi.factory import connected_websockets, api_data
for queue in connected_websockets:
await queue.put({
"cmd": "nodes",
"data": api_data['nodes']
})
async def httpget(url: str, json=True):
timeout = aiohttp.ClientTimeout(total=4)
headers = {"User-Agent": random_agent()}
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url, headers=headers) as response:
return await response.json() if json else await response.text()
def random_agent():
from fapi.factory import user_agents
return random.choice(user_agents)
class TxFiatDb:
# historical fiat price db for given symbol
def __init__(self, symbol, block_date_start):
self.fn = "data/fiatdb"
self.symbol = symbol
self.block_start = block_date_start
self._url = "https://www.coingecko.com/price_charts/69/usd/max.json"
self.data = {}
self.load()
def get(self, year: int, month: int = None):
rtn = {}
if year not in self.data:
return
if not month:
for _m, days in self.data[year].items():
for day, price in days.items():
rtn[datetime(year, _m, day).strftime('%Y%m%d')] = price
return rtn
if month not in self.data[year]:
return
for day, price in self.data[year][month].items():
rtn[datetime(year, month, day).strftime('%Y%m%d')] = price
return rtn
def load(self):
if not os.path.exists("fiatdb"):
return {}
f = open("fiatdb", "r")
data = f.read()
f.close()
data = json.loads(data)
# whatever
self.data = {int(k): {int(_k): {int(__k): __v for __k, __v in _v.items()} for _k, _v in v.items()} for k, v in data.items()}
def write(self):
f = open("fiatdb", "w")
f.write(json.dumps(self.data))
f.close()
async def update(self):
try:
content = await httpget(self._url, json=True)
if not "stats" in content:
raise Exception()
except Exception as ex:
return
stats = content.get('stats')
if not stats:
return
year_start = int(self.block_start[:4])
self.data = {z: {k: {} for k in range(1, 13)}
for z in range(year_start, datetime.now().year + 1)}
content = {z[0]: z[1] for z in stats}
for k, v in content.items():
_date = datetime.fromtimestamp(k / 1000)
self.data[_date.year].setdefault(_date.month, {})
self.data[_date.year][_date.month][_date.day] = v
self.write()