From 7a2f9a95c7b0d1027237f4765dfa54a104f07813 Mon Sep 17 00:00:00 2001 From: Luna Mendes Date: Thu, 30 Aug 2018 22:22:02 -0300 Subject: [PATCH] add alerts - worker: change Adapter._construct to receive an extra arg - worker: hardcode a bad result and write it off on workloop fail --- config.example.py | 13 ++++++++-- elstat/adapters.py | 21 ++++++++++++---- elstat/alerts.py | 40 ++++++++++++++++++++++++++++++ elstat/consts.py | 5 ++++ elstat/manager.py | 13 ++++++++-- elstat/worker.py | 62 ++++++++++++++++++++++++++++++++++++++++++---- 6 files changed, 140 insertions(+), 14 deletions(-) create mode 100644 elstat/alerts.py diff --git a/config.example.py b/config.example.py index 8f3d34c..f2d5a58 100644 --- a/config.example.py +++ b/config.example.py @@ -9,7 +9,8 @@ SERVICES = { 'adapter_args': { 'url': 'https://elixi.re/api/hello' }, - 'poll': 10 + 'poll': 10, + 'alerts': ['beep'], }, 'dabian': { 'description': 'elixi.re main server', @@ -17,6 +18,14 @@ SERVICES = { 'adapter_args': { 'address': '192.168.1.1' }, - 'poll': 15 + 'poll': 15, + 'alerts': ['beep'], + } +} + +ALERTS = { + 'bepsi': { + 'type': 'discord', + 'url': 'beep boop' } } diff --git a/elstat/adapters.py b/elstat/adapters.py index c09f909..dae01bd 100644 --- a/elstat/adapters.py +++ b/elstat/adapters.py @@ -19,7 +19,14 @@ class Adapter: @classmethod def _construct(cls, *args) -> dict: columns = cls.spec['db'][1:] - return {col: args[idx] for idx, col in enumerate(columns)} + + base = {col: args[idx] for idx, col in enumerate(columns)} + + # if theres an extra arg, treat it as the error field + if len(args) > len(columns): + base['error'] = args[len(args) - 1] + + return base class PingAdapter(Adapter): @@ -77,12 +84,16 @@ class HttpAdapter(Adapter): resp = await session.get(f'{adp_args["url"]}') t_end = time.monotonic() + succ = resp.status == 200 latency = round((t_end - t_start) * 1000) + # drop latency to 0 to signal a non-success + latency = latency if succ else 0 + worker.log.info(f'status={resp.status} latency={latency}ms') - if resp.status == 200: - return cls._construct(True, latency) + if not succ: + err_str = f'HTTP Status - {resp.status}' + return cls._construct(succ, latency, err_str) - # use 0ms drops as failures - return cls._construct(False, 0) + return cls._construct(succ, latency if succ else 0) diff --git a/elstat/alerts.py b/elstat/alerts.py new file mode 100644 index 0000000..ee653f4 --- /dev/null +++ b/elstat/alerts.py @@ -0,0 +1,40 @@ +import logging + +from aiohttp import ClientSession + + +log = logging.getLogger(__name__) + + +class DiscordAlert: + def __init__(self, alert_name: str, alert: dict): + self.name = alert_name + self.url = alert['url'] + self.session = ClientSession() + + def _make_payload(self, service, status): + serv_name = service['name'] + is_up = status['status'] + + err = status.get('error', 'No error provided') + color = 0x00ff00 if is_up else 0xff0000 + + embed = { + 'title': serv_name, + 'color': color + } + + if not is_up: + embed['description'] = err + + return { + 'content': '', + 'embeds': [embed], + } + + async def post(self, service: dict, status: dict): + payload = self._make_payload(service, status) + + log.warning(f'Posting an alert! {status.get("error")}') + async with self.session.post(self.url, json=payload) as resp: + return resp diff --git a/elstat/consts.py b/elstat/consts.py index 662dcfc..595888c 100644 --- a/elstat/consts.py +++ b/elstat/consts.py @@ -1,4 +1,5 @@ from .adapters import HttpAdapter, PingAdapter +from .alerts import DiscordAlert ADAPTERS = { @@ -6,6 +7,10 @@ ADAPTERS = { 'ping': PingAdapter, } +ALERTS = { + 'discord': DiscordAlert +} + class IncidentType: OUTAGE = 'outage' diff --git a/elstat/manager.py b/elstat/manager.py index e41410f..30155bf 100644 --- a/elstat/manager.py +++ b/elstat/manager.py @@ -1,9 +1,9 @@ import logging import json -from typing import List, Dict, Any +from typing import List, Any -from .consts import ADAPTERS +from .consts import ADAPTERS, ALERTS from .worker import ServiceWorker from .blueprints.streaming import OP @@ -25,6 +25,7 @@ class ServiceManager: self.loop = app.loop self.workers = {} + self.alerts = {} self.state = {} self.subscribers = {} self._websockets = {} @@ -74,14 +75,17 @@ class ServiceManager: def _create_channels(self, worker): columns = worker.adapter.spec['db'] + # each service has a status and latency channel self._check(columns, 'status', worker.name) self._check(columns, 'latency', worker.name) def _start(self): self.subscribers['incidents'] = [] + # init services for name, service in self.cfg.SERVICES.items(): self._make_db_table(name, service) + service['name'] = name # spawn a service worker serv_worker = ServiceWorker(self, name, service) @@ -90,6 +94,11 @@ class ServiceManager: self._create_channels(serv_worker) + # init alerts + for name, alert in self.cfg.ALERTS.items(): + alert_cls = ALERTS[alert['type']] + self.alerts[name] = alert_cls(name, alert) + def close(self): for worker in self.workers.values(): worker.stop() diff --git a/elstat/worker.py b/elstat/worker.py index 14ec72e..5933b89 100644 --- a/elstat/worker.py +++ b/elstat/worker.py @@ -22,12 +22,20 @@ class ServiceWorker: async def process_work(self, result: dict): """Process given adapter result and insert into the database.""" + try: + # we work with a copy of result for main db + # operations (without error field) + db_res = dict(result) + db_res.pop('error') + except KeyError: + pass + columns = self.adapter.spec['db'] conn = self.manager.conn timestamp = int(time.time() * 1000) - args_str = ','.join(['?'] * (len(result) + 1)) + args_str = ','.join(['?'] * (len(db_res) + 1)) query = f""" INSERT INTO {self.name} ({','.join(columns)}) VALUES ({args_str}) @@ -35,21 +43,57 @@ class ServiceWorker: args = [] for col in columns[1:]: - val = result[col] + val = db_res[col] args.append(val) conn.execute(query, (timestamp, ) + tuple(args)) conn.commit() await self._dispatch_work(columns, timestamp, result) + await self._check_alert(result) async def _dispatch_work(self, columns, timestamp: int, result: tuple): + """Dispatch the work done by the adapter + through the channels""" prechan = columns[1:] chans = [f'{chan}:{self.name}' for chan in prechan] for col, chan in zip(prechan, chans): self.manager.publish(chan, [timestamp, result[col]]) + async def _check_alert(self, work): + """Check if any alerts should be thrown off by status changes.""" + cur = self.manager.conn.cursor() + + cur.execute(f""" + SELECT status FROM {self.name} + ORDER BY timestamp DESC + LIMIT 2 + """) + + rows = cur.fetchall() + + # extract latest and old from rows + first, last = rows + first_status, last_status = first[0], last[0] + + # dont do anything if theres no change + # to the statuses + if first_status == last_status: + return + + # oopsie whoopsie time to alertie owo + alerts = self.service.get('alerts', []) + + for alert in alerts: + try: + alert_obj = self.manager.alerts[alert] + except KeyError: + self.log.error(f'alert not found: {alert!r}') + continue + + await alert_obj.post(self.service, work) + async def _work_loop(self): try: while True: @@ -62,11 +106,19 @@ class ServiceWorker: await asyncio.sleep(self.service['poll']) except asyncio.CancelledError: self.log.info('cancelled, stopping') - except Exception: + except Exception as err: self.log.exception('fail on work loop, retrying') try: - self.manager.state[self.name]['status'] = False - self.manager.publish(f'status:{self.name}', False) + # hardcode a bad result on workloop failures + result = { + 'status': False, + 'latency': 0, + 'error': str(err), + } + + # FORCE EVERYONE TO KNOW ABOUT THAT FAILURE + self.manager.state[self.name] = result + await self.process_work(result) except KeyError: pass await self._work_loop()