wowlet-backend/fapi/tasks/__init__.py
dsc 42bb0c832e Feather-ws rewrite;
- Move recurring tasks into their own class; inherits from `FeatherTask`
- CCS proposals: Don't use API, it's broken - webcrawl instead until it is fixed.
- Switch to hypercorn as the ASGI server, *with* support for multiple workers. You can now run feather-ws with, for example, `--workers 6`. See `Dockerfile`.
- Introduce support for various coins under `BlockheightTask`
- Introduce support for various Reddit communities under `RedditTask`
- Introduced weightvoting whilst validating third-party RPC blockheights - where nodes are filtered based on what other nodes are commonly reporting.
- Current blockheights are fetched from various block explorers and weightvoting is done to eliminate outliers under `BlockheightTask`.
- Don't filter/remove bad nodes from the rpc_nodes list; correctly label them as disabled/bad nodes.
- Multiple Feather instances (each for it's own coin) can now run on one machine, using only one Redis instance, as each coins has it's own Redis database index.
- Configuration options inside `settings.py` can now be controlled via environment variables.
- Better logging through custom log formatting and correct usage of `app.logger.*`
- Fixed a bug where one task could overlap with itself if the previous one did not finish yet. This was particularly noticable inside the `RPCNodeCheckTask` where the high timeout (for Tor nodes) could cause the task to run *longer* than the recurring task interval.
- Introduced a `docker-compose.yml` to combine the Feather container with Redis and Tor containers.
- Blocking IO operations are now done via `aiofiles`
2020-12-22 19:07:24 +01:00

160 lines
5.5 KiB
Python

# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2020, The Monero Project.
# Copyright (c) 2020, dsc@xmr.pm
import json
import asyncio
import random
from typing import Union
class FeatherTask:
"""
The base class of many recurring tasks for this
project. This abstracts away some functionality:
1. Tasks are automatically cached in Redis if the `_cache_key` is set.
2. The task result is propagated to connected websocket clients if
`_websocket_cmd` is set.
3. Inheritors should implement the `task()` method.
4. Inheritors can optionally implement the `done()` method.
"""
def __init__(self, interval: int):
"""
:param interval: secs
"""
self.interval = interval
# propogate to websocket clients?
self._websocket_cmd: str = None
# redis
self._cache_key: str = None
self._cache_expiry: int = None
# logging
self._qualname: str = f"{self.__class__.__module__}.{self.__class__.__name__}"
self._active = True
self._running = False
async def start(self, *args, **kwargs):
from fapi.factory import app, connected_websockets
if not self._active:
# invalid task
return
app.logger.info(f"Starting task {self._qualname}")
sleep = lambda: asyncio.sleep(random.randrange(self.interval - 5,
self.interval + 5))
while True:
if not self._active:
# invalid task
return
if self._running:
# task already running, wait for completion
await asyncio.sleep(5)
continue
try:
self._running = True
result: dict = await self.task(*args, **kwargs)
if not result:
raise Exception("No result")
except Exception as ex:
app.logger.error(f"{self._qualname} - {ex}")
# if the task failed we can attempt to use an old value from the cache.
if not self._cache_key:
app.logger.warning(f"{self._qualname} - No cache key for task, skipping")
await sleep()
self._running = False
continue
app.logger.info(f"{self._qualname} - trying cache")
result = await self.cache_get(self._cache_key)
if result:
app.logger.warning(f"serving cached result for {self._qualname}")
else:
app.logger.error(f"{self._qualname} - cache lookup failed, fix me")
await sleep()
self._running = False
continue
# optional: propogate result to websocket peers
if self._websocket_cmd and result:
# but only when there is a change
normalize = lambda k: json.dumps(k, sort_keys=True, indent=4)
propagate = True
cached = await self.cache_get(self._cache_key)
if cached:
if normalize(cached) == normalize(result):
propagate = False
if propagate:
for queue in connected_websockets:
await queue.put({
"cmd": self._websocket_cmd,
"data": {
self._websocket_cmd: result
}
})
# optional: cache the result
if self._cache_key and result:
await self.cache_set(self._cache_key, result, self._cache_expiry)
# optional: call completion function
if 'done' in self.__class__.__dict__:
await self.done(result)
await sleep()
self._running = False
async def task(self, *args, **kwargs):
raise NotImplementedError()
async def done(self, *args, **kwargs):
"""overload this method to execute this function after
completion of `task`. Results from `task` are parameters
for `done`."""
raise NotImplementedError()
async def end(self, result: dict):
raise NotImplementedError()
async def cache_get(self, key: str) -> dict:
from fapi.factory import app, cache
try:
data = await cache.get(key)
if not data:
return {}
return json.loads(data)
except Exception as ex:
app.logger.error(f"Redis GET error with key '{key}': {ex}")
async def cache_set(self, key, val: Union[dict, int], expiry: int = 0) -> bool:
from fapi.factory import app, cache
try:
data = json.dumps(val)
if isinstance(expiry, int) and expiry > 0:
await cache.setex(key, expiry, data)
else:
await cache.set(key, data)
return True
except Exception as ex:
app.logger.error(f"Redis SET error with key '{key}': {ex}")
from fapi.tasks.proposals import FundingProposalsTask
from fapi.tasks.historical_prices import HistoricalPriceTask
from fapi.tasks.blockheight import BlockheightTask
from fapi.tasks.rates_fiat import FiatRatesTask
from fapi.tasks.rates_crypto import CryptoRatesTask
from fapi.tasks.reddit import RedditTask
from fapi.tasks.rpc_nodes import RPCNodeCheckTask
from fapi.tasks.xmrig import XmrigTask
from fapi.tasks.xmrto import XmrToTask