add channel creation
dynamically create channels by adapter's specs. - add cleanup tasks for ServiceManager and ServiceWorker
This commit is contained in:
parent
053762f69f
commit
7e6d9d5402
3 changed files with 32 additions and 4 deletions
|
@ -40,6 +40,20 @@ class ServiceManager:
|
||||||
);
|
);
|
||||||
""")
|
""")
|
||||||
|
|
||||||
|
def _check(self, columns: tuple, field: str, chan_name: str = None):
|
||||||
|
if chan_name is None:
|
||||||
|
chan_name = field
|
||||||
|
|
||||||
|
if field in columns and chan_name not in self.subscribers:
|
||||||
|
self.subscribers[chan_name] = []
|
||||||
|
log.info(f'Created channel {chan_name}')
|
||||||
|
|
||||||
|
def _create_channels(self, worker):
|
||||||
|
columns = worker.adapter.spec['db']
|
||||||
|
|
||||||
|
self._check(columns, 'status')
|
||||||
|
self._check(columns, 'latency', f'latency:{worker.name}')
|
||||||
|
|
||||||
def _start(self):
|
def _start(self):
|
||||||
for name, service in self.cfg.SERVICES.items():
|
for name, service in self.cfg.SERVICES.items():
|
||||||
self._make_db_table(name, service)
|
self._make_db_table(name, service)
|
||||||
|
@ -47,9 +61,14 @@ class ServiceManager:
|
||||||
# spawn a service worker
|
# spawn a service worker
|
||||||
serv_worker = ServiceWorker(self, name, service)
|
serv_worker = ServiceWorker(self, name, service)
|
||||||
self.workers[name] = serv_worker
|
self.workers[name] = serv_worker
|
||||||
|
|
||||||
self.state[name] = None
|
self.state[name] = None
|
||||||
|
|
||||||
|
self._create_channels(serv_worker)
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
for worker in self.workers.values():
|
||||||
|
worker.stop()
|
||||||
|
|
||||||
def subscribe(self, channels: List[str], websocket):
|
def subscribe(self, channels: List[str], websocket):
|
||||||
"""Subscribe to a list of channels."""
|
"""Subscribe to a list of channels."""
|
||||||
subscribed = []
|
subscribed = []
|
||||||
|
|
|
@ -19,7 +19,9 @@ class ServiceWorker:
|
||||||
async def work(self):
|
async def work(self):
|
||||||
return await self.adapter.query(self, self.service['adapter_args'])
|
return await self.adapter.query(self, self.service['adapter_args'])
|
||||||
|
|
||||||
async def process_work(self, result):
|
async def process_work(self, result: tuple):
|
||||||
|
"""Process given adapter result and insert into
|
||||||
|
the database."""
|
||||||
columns = self.adapter.spec['db']
|
columns = self.adapter.spec['db']
|
||||||
conn = self.manager.conn
|
conn = self.manager.conn
|
||||||
|
|
||||||
|
@ -45,10 +47,16 @@ class ServiceWorker:
|
||||||
await self.process_work(res)
|
await self.process_work(res)
|
||||||
|
|
||||||
await asyncio.sleep(self.service['poll'])
|
await asyncio.sleep(self.service['poll'])
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
self.log.info('cancelled, stopping')
|
||||||
except Exception:
|
except Exception:
|
||||||
self.log.exception('fail on poll, retrying')
|
self.log.exception('fail on work loop, retrying')
|
||||||
await self._work_loop()
|
await self._work_loop()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""Stop the current worker."""
|
||||||
|
self._work_task.cancel()
|
||||||
|
|
||||||
def _start(self):
|
def _start(self):
|
||||||
self.log.info(f'starting work loop for {self.name}')
|
self.log.info(f'starting work loop for {self.name}')
|
||||||
self.manager.loop.create_task(self._work_loop())
|
self._work_task = self.manager.loop.create_task(self._work_loop())
|
||||||
|
|
1
run.py
1
run.py
|
@ -30,6 +30,7 @@ async def _app_start(refapp, loop):
|
||||||
|
|
||||||
@app.listener('after_server_stop')
|
@app.listener('after_server_stop')
|
||||||
async def _app_stop(refapp, _loop):
|
async def _app_stop(refapp, _loop):
|
||||||
|
refapp.manager.close()
|
||||||
refapp.conn.close()
|
refapp.conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue