add alerts
- worker: change Adapter._construct to receive an extra arg - worker: hardcode a bad result and write it off on workloop fail
This commit is contained in:
parent
a04d44e7de
commit
7a2f9a95c7
6 changed files with 140 additions and 14 deletions
|
@ -9,7 +9,8 @@ SERVICES = {
|
||||||
'adapter_args': {
|
'adapter_args': {
|
||||||
'url': 'https://elixi.re/api/hello'
|
'url': 'https://elixi.re/api/hello'
|
||||||
},
|
},
|
||||||
'poll': 10
|
'poll': 10,
|
||||||
|
'alerts': ['beep'],
|
||||||
},
|
},
|
||||||
'dabian': {
|
'dabian': {
|
||||||
'description': 'elixi.re main server',
|
'description': 'elixi.re main server',
|
||||||
|
@ -17,6 +18,14 @@ SERVICES = {
|
||||||
'adapter_args': {
|
'adapter_args': {
|
||||||
'address': '192.168.1.1'
|
'address': '192.168.1.1'
|
||||||
},
|
},
|
||||||
'poll': 15
|
'poll': 15,
|
||||||
|
'alerts': ['beep'],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ALERTS = {
|
||||||
|
'bepsi': {
|
||||||
|
'type': 'discord',
|
||||||
|
'url': 'beep boop'
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,14 @@ class Adapter:
|
||||||
@classmethod
|
@classmethod
|
||||||
def _construct(cls, *args) -> dict:
|
def _construct(cls, *args) -> dict:
|
||||||
columns = cls.spec['db'][1:]
|
columns = cls.spec['db'][1:]
|
||||||
return {col: args[idx] for idx, col in enumerate(columns)}
|
|
||||||
|
base = {col: args[idx] for idx, col in enumerate(columns)}
|
||||||
|
|
||||||
|
# if theres an extra arg, treat it as the error field
|
||||||
|
if len(args) > len(columns):
|
||||||
|
base['error'] = args[len(args) - 1]
|
||||||
|
|
||||||
|
return base
|
||||||
|
|
||||||
|
|
||||||
class PingAdapter(Adapter):
|
class PingAdapter(Adapter):
|
||||||
|
@ -77,12 +84,16 @@ class HttpAdapter(Adapter):
|
||||||
resp = await session.get(f'{adp_args["url"]}')
|
resp = await session.get(f'{adp_args["url"]}')
|
||||||
t_end = time.monotonic()
|
t_end = time.monotonic()
|
||||||
|
|
||||||
|
succ = resp.status == 200
|
||||||
latency = round((t_end - t_start) * 1000)
|
latency = round((t_end - t_start) * 1000)
|
||||||
|
|
||||||
|
# drop latency to 0 to signal a non-success
|
||||||
|
latency = latency if succ else 0
|
||||||
|
|
||||||
worker.log.info(f'status={resp.status} latency={latency}ms')
|
worker.log.info(f'status={resp.status} latency={latency}ms')
|
||||||
|
|
||||||
if resp.status == 200:
|
if not succ:
|
||||||
return cls._construct(True, latency)
|
err_str = f'HTTP Status - {resp.status}'
|
||||||
|
return cls._construct(succ, latency, err_str)
|
||||||
|
|
||||||
# use 0ms drops as failures
|
return cls._construct(succ, latency if succ else 0)
|
||||||
return cls._construct(False, 0)
|
|
||||||
|
|
40
elstat/alerts.py
Normal file
40
elstat/alerts.py
Normal file
|
@ -0,0 +1,40 @@
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from aiohttp import ClientSession
|
||||||
|
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class DiscordAlert:
|
||||||
|
def __init__(self, alert_name: str, alert: dict):
|
||||||
|
self.name = alert_name
|
||||||
|
self.url = alert['url']
|
||||||
|
self.session = ClientSession()
|
||||||
|
|
||||||
|
def _make_payload(self, service, status):
|
||||||
|
serv_name = service['name']
|
||||||
|
is_up = status['status']
|
||||||
|
|
||||||
|
err = status.get('error', 'No error provided')
|
||||||
|
color = 0x00ff00 if is_up else 0xff0000
|
||||||
|
|
||||||
|
embed = {
|
||||||
|
'title': serv_name,
|
||||||
|
'color': color
|
||||||
|
}
|
||||||
|
|
||||||
|
if not is_up:
|
||||||
|
embed['description'] = err
|
||||||
|
|
||||||
|
return {
|
||||||
|
'content': '',
|
||||||
|
'embeds': [embed],
|
||||||
|
}
|
||||||
|
|
||||||
|
async def post(self, service: dict, status: dict):
|
||||||
|
payload = self._make_payload(service, status)
|
||||||
|
|
||||||
|
log.warning(f'Posting an alert! {status.get("error")}')
|
||||||
|
async with self.session.post(self.url, json=payload) as resp:
|
||||||
|
return resp
|
|
@ -1,4 +1,5 @@
|
||||||
from .adapters import HttpAdapter, PingAdapter
|
from .adapters import HttpAdapter, PingAdapter
|
||||||
|
from .alerts import DiscordAlert
|
||||||
|
|
||||||
|
|
||||||
ADAPTERS = {
|
ADAPTERS = {
|
||||||
|
@ -6,6 +7,10 @@ ADAPTERS = {
|
||||||
'ping': PingAdapter,
|
'ping': PingAdapter,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ALERTS = {
|
||||||
|
'discord': DiscordAlert
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
class IncidentType:
|
class IncidentType:
|
||||||
OUTAGE = 'outage'
|
OUTAGE = 'outage'
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
import logging
|
import logging
|
||||||
import json
|
import json
|
||||||
|
|
||||||
from typing import List, Dict, Any
|
from typing import List, Any
|
||||||
|
|
||||||
from .consts import ADAPTERS
|
from .consts import ADAPTERS, ALERTS
|
||||||
from .worker import ServiceWorker
|
from .worker import ServiceWorker
|
||||||
|
|
||||||
from .blueprints.streaming import OP
|
from .blueprints.streaming import OP
|
||||||
|
@ -25,6 +25,7 @@ class ServiceManager:
|
||||||
self.loop = app.loop
|
self.loop = app.loop
|
||||||
|
|
||||||
self.workers = {}
|
self.workers = {}
|
||||||
|
self.alerts = {}
|
||||||
self.state = {}
|
self.state = {}
|
||||||
self.subscribers = {}
|
self.subscribers = {}
|
||||||
self._websockets = {}
|
self._websockets = {}
|
||||||
|
@ -74,14 +75,17 @@ class ServiceManager:
|
||||||
def _create_channels(self, worker):
|
def _create_channels(self, worker):
|
||||||
columns = worker.adapter.spec['db']
|
columns = worker.adapter.spec['db']
|
||||||
|
|
||||||
|
# each service has a status and latency channel
|
||||||
self._check(columns, 'status', worker.name)
|
self._check(columns, 'status', worker.name)
|
||||||
self._check(columns, 'latency', worker.name)
|
self._check(columns, 'latency', worker.name)
|
||||||
|
|
||||||
def _start(self):
|
def _start(self):
|
||||||
self.subscribers['incidents'] = []
|
self.subscribers['incidents'] = []
|
||||||
|
|
||||||
|
# init services
|
||||||
for name, service in self.cfg.SERVICES.items():
|
for name, service in self.cfg.SERVICES.items():
|
||||||
self._make_db_table(name, service)
|
self._make_db_table(name, service)
|
||||||
|
service['name'] = name
|
||||||
|
|
||||||
# spawn a service worker
|
# spawn a service worker
|
||||||
serv_worker = ServiceWorker(self, name, service)
|
serv_worker = ServiceWorker(self, name, service)
|
||||||
|
@ -90,6 +94,11 @@ class ServiceManager:
|
||||||
|
|
||||||
self._create_channels(serv_worker)
|
self._create_channels(serv_worker)
|
||||||
|
|
||||||
|
# init alerts
|
||||||
|
for name, alert in self.cfg.ALERTS.items():
|
||||||
|
alert_cls = ALERTS[alert['type']]
|
||||||
|
self.alerts[name] = alert_cls(name, alert)
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
for worker in self.workers.values():
|
for worker in self.workers.values():
|
||||||
worker.stop()
|
worker.stop()
|
||||||
|
|
|
@ -22,12 +22,20 @@ class ServiceWorker:
|
||||||
async def process_work(self, result: dict):
|
async def process_work(self, result: dict):
|
||||||
"""Process given adapter result and insert into
|
"""Process given adapter result and insert into
|
||||||
the database."""
|
the database."""
|
||||||
|
try:
|
||||||
|
# we work with a copy of result for main db
|
||||||
|
# operations (without error field)
|
||||||
|
db_res = dict(result)
|
||||||
|
db_res.pop('error')
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
|
||||||
columns = self.adapter.spec['db']
|
columns = self.adapter.spec['db']
|
||||||
conn = self.manager.conn
|
conn = self.manager.conn
|
||||||
|
|
||||||
timestamp = int(time.time() * 1000)
|
timestamp = int(time.time() * 1000)
|
||||||
|
|
||||||
args_str = ','.join(['?'] * (len(result) + 1))
|
args_str = ','.join(['?'] * (len(db_res) + 1))
|
||||||
query = f"""
|
query = f"""
|
||||||
INSERT INTO {self.name} ({','.join(columns)})
|
INSERT INTO {self.name} ({','.join(columns)})
|
||||||
VALUES ({args_str})
|
VALUES ({args_str})
|
||||||
|
@ -35,21 +43,57 @@ class ServiceWorker:
|
||||||
|
|
||||||
args = []
|
args = []
|
||||||
for col in columns[1:]:
|
for col in columns[1:]:
|
||||||
val = result[col]
|
val = db_res[col]
|
||||||
args.append(val)
|
args.append(val)
|
||||||
|
|
||||||
conn.execute(query, (timestamp, ) + tuple(args))
|
conn.execute(query, (timestamp, ) + tuple(args))
|
||||||
conn.commit()
|
conn.commit()
|
||||||
|
|
||||||
await self._dispatch_work(columns, timestamp, result)
|
await self._dispatch_work(columns, timestamp, result)
|
||||||
|
await self._check_alert(result)
|
||||||
|
|
||||||
async def _dispatch_work(self, columns, timestamp: int, result: tuple):
|
async def _dispatch_work(self, columns, timestamp: int, result: tuple):
|
||||||
|
"""Dispatch the work done by the adapter
|
||||||
|
through the channels"""
|
||||||
prechan = columns[1:]
|
prechan = columns[1:]
|
||||||
chans = [f'{chan}:{self.name}' for chan in prechan]
|
chans = [f'{chan}:{self.name}' for chan in prechan]
|
||||||
|
|
||||||
for col, chan in zip(prechan, chans):
|
for col, chan in zip(prechan, chans):
|
||||||
self.manager.publish(chan, [timestamp, result[col]])
|
self.manager.publish(chan, [timestamp, result[col]])
|
||||||
|
|
||||||
|
async def _check_alert(self, work):
|
||||||
|
"""Check if any alerts should be thrown off by status changes."""
|
||||||
|
cur = self.manager.conn.cursor()
|
||||||
|
|
||||||
|
cur.execute(f"""
|
||||||
|
SELECT status FROM {self.name}
|
||||||
|
ORDER BY timestamp DESC
|
||||||
|
LIMIT 2
|
||||||
|
""")
|
||||||
|
|
||||||
|
rows = cur.fetchall()
|
||||||
|
|
||||||
|
# extract latest and old from rows
|
||||||
|
first, last = rows
|
||||||
|
first_status, last_status = first[0], last[0]
|
||||||
|
|
||||||
|
# dont do anything if theres no change
|
||||||
|
# to the statuses
|
||||||
|
if first_status == last_status:
|
||||||
|
return
|
||||||
|
|
||||||
|
# oopsie whoopsie time to alertie owo
|
||||||
|
alerts = self.service.get('alerts', [])
|
||||||
|
|
||||||
|
for alert in alerts:
|
||||||
|
try:
|
||||||
|
alert_obj = self.manager.alerts[alert]
|
||||||
|
except KeyError:
|
||||||
|
self.log.error(f'alert not found: {alert!r}')
|
||||||
|
continue
|
||||||
|
|
||||||
|
await alert_obj.post(self.service, work)
|
||||||
|
|
||||||
async def _work_loop(self):
|
async def _work_loop(self):
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
|
@ -62,11 +106,19 @@ class ServiceWorker:
|
||||||
await asyncio.sleep(self.service['poll'])
|
await asyncio.sleep(self.service['poll'])
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
self.log.info('cancelled, stopping')
|
self.log.info('cancelled, stopping')
|
||||||
except Exception:
|
except Exception as err:
|
||||||
self.log.exception('fail on work loop, retrying')
|
self.log.exception('fail on work loop, retrying')
|
||||||
try:
|
try:
|
||||||
self.manager.state[self.name]['status'] = False
|
# hardcode a bad result on workloop failures
|
||||||
self.manager.publish(f'status:{self.name}', False)
|
result = {
|
||||||
|
'status': False,
|
||||||
|
'latency': 0,
|
||||||
|
'error': str(err),
|
||||||
|
}
|
||||||
|
|
||||||
|
# FORCE EVERYONE TO KNOW ABOUT THAT FAILURE
|
||||||
|
self.manager.state[self.name] = result
|
||||||
|
await self.process_work(result)
|
||||||
except KeyError:
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
await self._work_loop()
|
await self._work_loop()
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue