55 lines
1.5 KiB
Python
55 lines
1.5 KiB
Python
import time
|
|
import asyncio
|
|
import logging
|
|
|
|
from .consts import ADAPTERS
|
|
|
|
|
|
class ServiceWorker:
|
|
def __init__(self, manager, name, service):
|
|
self.name = name
|
|
self.manager = manager
|
|
self.service = service
|
|
self.adapter = ADAPTERS[service['adapter']]
|
|
self.log = logging.getLogger(f'elstat.service.{name}')
|
|
self.last_poll = None
|
|
|
|
self._start()
|
|
|
|
async def work(self):
|
|
return await self.adapter.query(self, self.service['adapter_args'])
|
|
|
|
async def process_work(self, result):
|
|
columns = self.adapter.spec['db']
|
|
conn = self.manager.conn
|
|
|
|
timestamp = int(time.time() * 1000)
|
|
|
|
args_str = ','.join(['?'] * (len(result) + 1))
|
|
query = f"""
|
|
INSERT INTO {self.name} ({','.join(columns)})
|
|
VALUES ({args_str})
|
|
"""
|
|
|
|
conn.execute(query, (timestamp,) + result)
|
|
conn.commit()
|
|
|
|
async def _work_loop(self):
|
|
try:
|
|
while True:
|
|
self.log.info(f'polling {self.name}')
|
|
self.last_poll = time.monotonic()
|
|
res = await self.work()
|
|
|
|
self.manager.state[self.name] = res
|
|
await self.process_work(res)
|
|
|
|
await asyncio.sleep(self.service['poll'])
|
|
except Exception:
|
|
self.log.exception('fail on poll, retrying')
|
|
await self._work_loop()
|
|
|
|
def _start(self):
|
|
self.log.info(f'starting work loop for {self.name}')
|
|
self.manager.loop.create_task(self._work_loop())
|