From 359c62efd69630fb4c108cdcfd567f181f14307d Mon Sep 17 00:00:00 2001 From: Luna Mendes Date: Sun, 8 Jul 2018 00:38:14 -0300 Subject: [PATCH] Add proper adapter implementations also add table making for the db --- elstat/__init__.py | 0 elstat/adapters.py | 74 ++++++++++++++++++++++++++++++++++++++++++---- elstat/manager.py | 28 ++++++++++++++++++ elstat/worker.py | 3 ++ 4 files changed, 99 insertions(+), 6 deletions(-) create mode 100644 elstat/__init__.py diff --git a/elstat/__init__.py b/elstat/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/elstat/adapters.py b/elstat/adapters.py index 303f3d2..3575037 100644 --- a/elstat/adapters.py +++ b/elstat/adapters.py @@ -1,9 +1,71 @@ -class PingAdapter: - async def query(worker, adp_args: dict): - return None +import asyncio +import time +import re + +PING_RGX = re.compile(r'(.+)( 0% packet loss)(.+)', re.I | re.M) -class ElixireAdapter: - async def query(worker, adp_args: dict): - return +class Adapter: + spec = { + 'db': None, + } + @classmethod + async def query(cls, _worker, _adp_args): + """Main query function.""" + raise NotImplementedError + + +class PingAdapter(Adapter): + """Ping the given address and report if + any packet loss happened.""" + spec = { + 'db': ('timestamp', 'status') + } + + @classmethod + async def query(cls, worker, adp_args: dict): + process = await asyncio.create_subprocess_shell( + f'ping -c 1 {adp_args["address"]}', + stderr=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + ) + + out, err = map(lambda s: s.decode('utf-8'), + await process.communicate()) + + alive = bool(re.search(PING_RGX, out + err)) + worker.log.info(f'{worker.name}: alive? {alive}') + + return alive + + +class ElixireAdapter(Adapter): + """Adapter to check if a certain + elixire instance is reporting well. + + Uses the /api/hello route to determine livelyhood. + """ + spec = { + 'db': ('timestamp', 'status', 'latency') + } + + @classmethod + async def query(cls, worker, adp_args: dict): + # yes, lots of attributes + session = worker.manager.app.session + + t_start = time.monotonic() + resp = await session.get(f'{adp_args["base_url"]}/api/hello') + t_end = time.monotonic() + + latency = round((t_end - t_start) * 1000) + + worker.log.info(f'{worker.name}: status={resp.status} ' + f'latency={latency}ms') + + if resp.status == 200: + return 200, latency + + # use 0ms drops as failures + return False, 0 diff --git a/elstat/manager.py b/elstat/manager.py index a7376a5..da8b6c6 100644 --- a/elstat/manager.py +++ b/elstat/manager.py @@ -1,5 +1,16 @@ +import logging + +from .consts import ADAPTERS from .worker import ServiceWorker +log = logging.getLogger(__name__) + +_COLUMNS = { + 'timestamp': 'timestamp bigint', + 'status': 'status bool', + 'latency': 'latency bigint', +} + class ServiceManager: def __init__(self, app): @@ -13,8 +24,25 @@ class ServiceManager: self._start() + def _make_db_table(self, name: str, service: dict): + adapter = ADAPTERS[service['adapter']] + + columnstr = map(_COLUMNS.get, adapter.spec['db']) + columnstr = ',\n'.join(columnstr) + + log.info(f'Making table for {name}') + self.conn.executescript(f""" + CREATE TABLE IF NOT EXISTS {name} ( + {columnstr} + ); + """) + def _start(self): + self.conn.executescript(""" + """) for name, service in self.cfg.SERVICES.items(): + self._make_db_table(name, service) + # spawn a service worker serv_worker = ServiceWorker(self, name, service) self.workers[name] = serv_worker diff --git a/elstat/worker.py b/elstat/worker.py index 6283a7c..3cdeae8 100644 --- a/elstat/worker.py +++ b/elstat/worker.py @@ -1,3 +1,4 @@ +import time import asyncio import logging @@ -11,6 +12,7 @@ class ServiceWorker: self.service = service self.adapter = ADAPTERS[service['adapter']] self.log = logging.getLogger(f'elstat.service.{name}') + self.last_poll = None self._start() @@ -22,6 +24,7 @@ class ServiceWorker: try: while True: self.log.info(f'polling {self.name}') + self.last_poll = time.monotonic() await self.work() await asyncio.sleep(self.service['poll']) except Exception: