Add proper adapter implementations
also add table making for the db
This commit is contained in:
parent
9ed1ec60cf
commit
359c62efd6
4 changed files with 99 additions and 6 deletions
0
elstat/__init__.py
Normal file
0
elstat/__init__.py
Normal file
|
@ -1,9 +1,71 @@
|
|||
class PingAdapter:
|
||||
async def query(worker, adp_args: dict):
|
||||
return None
|
||||
import asyncio
|
||||
import time
|
||||
import re
|
||||
|
||||
PING_RGX = re.compile(r'(.+)( 0% packet loss)(.+)', re.I | re.M)
|
||||
|
||||
|
||||
class ElixireAdapter:
|
||||
async def query(worker, adp_args: dict):
|
||||
return
|
||||
class Adapter:
|
||||
spec = {
|
||||
'db': None,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
async def query(cls, _worker, _adp_args):
|
||||
"""Main query function."""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class PingAdapter(Adapter):
|
||||
"""Ping the given address and report if
|
||||
any packet loss happened."""
|
||||
spec = {
|
||||
'db': ('timestamp', 'status')
|
||||
}
|
||||
|
||||
@classmethod
|
||||
async def query(cls, worker, adp_args: dict):
|
||||
process = await asyncio.create_subprocess_shell(
|
||||
f'ping -c 1 {adp_args["address"]}',
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
)
|
||||
|
||||
out, err = map(lambda s: s.decode('utf-8'),
|
||||
await process.communicate())
|
||||
|
||||
alive = bool(re.search(PING_RGX, out + err))
|
||||
worker.log.info(f'{worker.name}: alive? {alive}')
|
||||
|
||||
return alive
|
||||
|
||||
|
||||
class ElixireAdapter(Adapter):
|
||||
"""Adapter to check if a certain
|
||||
elixire instance is reporting well.
|
||||
|
||||
Uses the /api/hello route to determine livelyhood.
|
||||
"""
|
||||
spec = {
|
||||
'db': ('timestamp', 'status', 'latency')
|
||||
}
|
||||
|
||||
@classmethod
|
||||
async def query(cls, worker, adp_args: dict):
|
||||
# yes, lots of attributes
|
||||
session = worker.manager.app.session
|
||||
|
||||
t_start = time.monotonic()
|
||||
resp = await session.get(f'{adp_args["base_url"]}/api/hello')
|
||||
t_end = time.monotonic()
|
||||
|
||||
latency = round((t_end - t_start) * 1000)
|
||||
|
||||
worker.log.info(f'{worker.name}: status={resp.status} '
|
||||
f'latency={latency}ms')
|
||||
|
||||
if resp.status == 200:
|
||||
return 200, latency
|
||||
|
||||
# use 0ms drops as failures
|
||||
return False, 0
|
||||
|
|
|
@ -1,5 +1,16 @@
|
|||
import logging
|
||||
|
||||
from .consts import ADAPTERS
|
||||
from .worker import ServiceWorker
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
_COLUMNS = {
|
||||
'timestamp': 'timestamp bigint',
|
||||
'status': 'status bool',
|
||||
'latency': 'latency bigint',
|
||||
}
|
||||
|
||||
|
||||
class ServiceManager:
|
||||
def __init__(self, app):
|
||||
|
@ -13,8 +24,25 @@ class ServiceManager:
|
|||
|
||||
self._start()
|
||||
|
||||
def _make_db_table(self, name: str, service: dict):
|
||||
adapter = ADAPTERS[service['adapter']]
|
||||
|
||||
columnstr = map(_COLUMNS.get, adapter.spec['db'])
|
||||
columnstr = ',\n'.join(columnstr)
|
||||
|
||||
log.info(f'Making table for {name}')
|
||||
self.conn.executescript(f"""
|
||||
CREATE TABLE IF NOT EXISTS {name} (
|
||||
{columnstr}
|
||||
);
|
||||
""")
|
||||
|
||||
def _start(self):
|
||||
self.conn.executescript("""
|
||||
""")
|
||||
for name, service in self.cfg.SERVICES.items():
|
||||
self._make_db_table(name, service)
|
||||
|
||||
# spawn a service worker
|
||||
serv_worker = ServiceWorker(self, name, service)
|
||||
self.workers[name] = serv_worker
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import time
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
|
@ -11,6 +12,7 @@ class ServiceWorker:
|
|||
self.service = service
|
||||
self.adapter = ADAPTERS[service['adapter']]
|
||||
self.log = logging.getLogger(f'elstat.service.{name}')
|
||||
self.last_poll = None
|
||||
|
||||
self._start()
|
||||
|
||||
|
@ -22,6 +24,7 @@ class ServiceWorker:
|
|||
try:
|
||||
while True:
|
||||
self.log.info(f'polling {self.name}')
|
||||
self.last_poll = time.monotonic()
|
||||
await self.work()
|
||||
await asyncio.sleep(self.service['poll'])
|
||||
except Exception:
|
||||
|
|
Loading…
Reference in a new issue