import datetime import logging from aiohttp import ClientSession log = logging.getLogger(__name__) class DiscordAlert: def __init__(self, alert_name: str, alert: dict): self.name = alert_name self.url = alert['url'] self.session = ClientSession() def _make_payload(self, worker, status: dict): service = worker.service serv_name = service['name'] is_up = status['status'] err = status.get('error', 'No error provided') color = 0x00ff00 if is_up else 0xff0000 ts = status['timestamp'] / 1000 dt_iso = datetime.datetime.utcfromtimestamp(ts).isoformat() embed = { 'title': serv_name, 'color': color, 'timestamp': dt_iso } if not is_up: embed['description'] = f'error: {err}' else: # calculate downtime here # first we need to find the work that had a success # before the current one conn = worker.manager.conn cur = conn.cursor() # find the last work that had a success cur.execute(f""" SELECT timestamp FROM {worker.name} WHERE timestamp < ? AND status = 1 ORDER BY timestamp DESC LIMIT 1 """, (status['timestamp'], )) row = cur.fetchone() ts_success = row[0] # now we fetch all the downtime after that ts_success cur.execute(f""" SELECT COUNT(*) FROM {worker.name} WHERE timestamp > ? AND status = 0 """, (ts_success,)) row = cur.fetchone() count = row[0] downtime_msec = count * worker.service['poll'] * 1000 downtime_sec = downtime_msec / 1000 downtime_min = round(downtime_sec / 60, 3) embed['footer'] = { 'text': f'down for {downtime_min} minutes ' f'({downtime_sec} seconds)' } return { 'content': '', 'embeds': [embed], } async def post(self, service: dict, status: dict): payload = self._make_payload(service, status) log.warning(f'Posting an alert! {status.get("error")}') async with self.session.post(self.url, json=payload) as resp: return resp