import time import asyncio import logging from .consts import ADAPTERS class ServiceWorker: def __init__(self, manager, name, service): self.name = name self.manager = manager self.service = service self.adapter = ADAPTERS[service['adapter']] self.log = logging.getLogger(f'elstat.service.{name}') self.last_poll = None self._start() async def work(self): return await self.adapter.query(self, self.service['adapter_args']) async def process_work(self, result: dict): """Process given adapter result and insert into the database.""" columns = self.adapter.spec['db'] conn = self.manager.conn timestamp = int(time.time() * 1000) args_str = ','.join(['?'] * (len(result) + 1)) query = f""" INSERT INTO {self.name} ({','.join(columns)}) VALUES ({args_str}) """ args = [] for col in columns[1:]: val = result[col] args.append(val) conn.execute(query, (timestamp, ) + tuple(args)) conn.commit() await self._dispatch_work(columns, timestamp, result) async def _dispatch_work(self, columns, timestamp: int, result: tuple): prechan = columns[1:] chans = [f'{chan}:{self.name}' for chan in prechan] for col, chan in zip(prechan, chans): self.manager.publish(chan, [timestamp, result[col]]) async def _work_loop(self): try: while True: self.last_poll = time.monotonic() res = await self.work() self.manager.state[self.name] = res await self.process_work(res) await asyncio.sleep(self.service['poll']) except asyncio.CancelledError: self.log.info('cancelled, stopping') except Exception: self.log.exception('fail on work loop, retrying') try: self.manager.state[self.name]['status'] = False self.manager.publish(f'status:{self.name}', False) except KeyError: pass await self._work_loop() def stop(self): """Stop the current worker.""" self._work_task.cancel() def _start(self): self.log.info(f'starting work loop for {self.name}') self._work_task = self.manager.loop.create_task(self._work_loop())