add ServiceManager & ServiceWorker

This commit is contained in:
Luna Mendes 2018-07-07 20:51:43 -03:00
parent 78ae8f27f0
commit 9ed1ec60cf
5 changed files with 72 additions and 0 deletions

9
elstat/adapters.py Normal file
View File

@ -0,0 +1,9 @@
class PingAdapter:
async def query(worker, adp_args: dict):
return None
class ElixireAdapter:
async def query(worker, adp_args: dict):
return

7
elstat/consts.py Normal file
View File

@ -0,0 +1,7 @@
from .adapters import ElixireAdapter, PingAdapter
ADAPTERS = {
'elixire': ElixireAdapter,
'ping': PingAdapter,
}

22
elstat/manager.py Normal file
View File

@ -0,0 +1,22 @@
from .worker import ServiceWorker
class ServiceManager:
def __init__(self, app):
self.app = app
self.cfg = app.cfg
self.conn = app.conn
self.loop = app.loop
self.workers = {}
self.state = {}
self._start()
def _start(self):
for name, service in self.cfg.SERVICES.items():
# spawn a service worker
serv_worker = ServiceWorker(self, name, service)
self.workers[name] = serv_worker
self.state[name] = False

32
elstat/worker.py Normal file
View File

@ -0,0 +1,32 @@
import asyncio
import logging
from .consts import ADAPTERS
class ServiceWorker:
def __init__(self, manager, name, service):
self.name = name
self.manager = manager
self.service = service
self.adapter = ADAPTERS[service['adapter']]
self.log = logging.getLogger(f'elstat.service.{name}')
self._start()
async def work(self):
res = await self.adapter.query(self, self.service['adapter_args'])
return res
async def _work_loop(self):
try:
while True:
self.log.info(f'polling {self.name}')
await self.work()
await asyncio.sleep(self.service['poll'])
except Exception:
self.log.exception('fail on poll')
def _start(self):
self.log.info(f'starting work loop for {self.name}')
self.manager.loop.create_task(self._work_loop())

2
run.py
View File

@ -7,6 +7,7 @@ from sanic_cors import CORS
from sanic.exceptions import NotFound, FileNotFound
import config
from elstat import manager
logging.basicConfig(level=logging.INFO)
app = Sanic()
@ -18,6 +19,7 @@ CORS(app, automatic_options=True)
async def _app_start(refapp, loop):
refapp.session = aiohttp.ClientSession(loop=loop)
refapp.conn = sqlite3.connect('elstat.db')
refapp.serv = manager.ServiceManager(app)
@app.listener('after_server_stop')