elstat/elstat/worker.py

138 lines
4.0 KiB
Python

import time
import asyncio
import logging
from .consts import ADAPTERS
class ServiceWorker:
def __init__(self, manager, name, service):
self.name = name
self.manager = manager
self.service = service
self.adapter = ADAPTERS[service['adapter']]
self.log = logging.getLogger(f'elstat.service.{name}')
self.last_poll = None
self._start()
async def work(self):
return await self.adapter.query(self, self.service['adapter_args'])
async def process_work(self, result: dict):
"""Process given adapter result and insert into
the database."""
try:
# we work with a copy of result for main db
# operations (without error field)
db_res = dict(result)
db_res.pop('error')
except KeyError:
pass
columns = self.adapter.spec['db']
conn = self.manager.conn
timestamp = int(time.time() * 1000)
args_str = ','.join(['?'] * (len(db_res) + 1))
query = f"""
INSERT INTO {self.name} ({','.join(columns)})
VALUES ({args_str})
"""
args = []
for col in columns[1:]:
val = db_res[col]
args.append(val)
conn.execute(query, (timestamp, ) + tuple(args))
conn.commit()
await self._dispatch_work(columns, timestamp, result)
await self._check_alert(result)
async def _dispatch_work(self, columns, timestamp: int, result: tuple):
"""Dispatch the work done by the adapter
through the channels"""
prechan = columns[1:]
chans = [f'{chan}:{self.name}' for chan in prechan]
for col, chan in zip(prechan, chans):
self.manager.publish(chan, [timestamp, result[col]])
async def _check_alert(self, work):
"""Check if any alerts should be thrown off by status changes."""
cur = self.manager.conn.cursor()
cur.execute(f"""
SELECT status FROM {self.name}
ORDER BY timestamp DESC
LIMIT 2
""")
rows = cur.fetchall()
# extract latest and old from rows
first, last = rows
first_status, last_status = first[0], last[0]
print(first_status, last_status)
# dont do anything if theres no change
# to the statuses
if first_status == last_status:
return
# oopsie whoopsie time to alertie owo
alerts = self.service.get('alerts', [])
if not alerts:
self.log.warning(f'no alerts set for {self.name}')
return
for alert in alerts:
try:
alert_obj = self.manager.alerts[alert]
except KeyError:
self.log.error(f'alert not found: {alert!r}')
continue
await alert_obj.post(self.service, work)
async def _work_loop(self):
try:
while True:
self.last_poll = time.monotonic()
res = await self.work()
self.manager.state[self.name] = res
await self.process_work(res)
await asyncio.sleep(self.service['poll'])
except asyncio.CancelledError:
self.log.info('cancelled, stopping')
except Exception as err:
self.log.exception('fail on work loop, retrying')
try:
# hardcode a bad result on workloop failures
result = {
'status': False,
'latency': 0,
'error': str(err),
}
# FORCE EVERYONE TO KNOW ABOUT THAT FAILURE
self.manager.state[self.name] = result
await self.process_work(result)
except KeyError:
pass
await self._work_loop()
def stop(self):
"""Stop the current worker."""
self._work_task.cancel()
def _start(self):
self.log.info(f'starting work loop for {self.name}')
self._work_task = self.manager.loop.create_task(self._work_loop())