diff --git a/elstat/manager.py b/elstat/manager.py index 19c3353..4b95260 100644 --- a/elstat/manager.py +++ b/elstat/manager.py @@ -40,6 +40,20 @@ class ServiceManager: ); """) + def _check(self, columns: tuple, field: str, chan_name: str = None): + if chan_name is None: + chan_name = field + + if field in columns and chan_name not in self.subscribers: + self.subscribers[chan_name] = [] + log.info(f'Created channel {chan_name}') + + def _create_channels(self, worker): + columns = worker.adapter.spec['db'] + + self._check(columns, 'status') + self._check(columns, 'latency', f'latency:{worker.name}') + def _start(self): for name, service in self.cfg.SERVICES.items(): self._make_db_table(name, service) @@ -47,9 +61,14 @@ class ServiceManager: # spawn a service worker serv_worker = ServiceWorker(self, name, service) self.workers[name] = serv_worker - self.state[name] = None + self._create_channels(serv_worker) + + def close(self): + for worker in self.workers.values(): + worker.stop() + def subscribe(self, channels: List[str], websocket): """Subscribe to a list of channels.""" subscribed = [] diff --git a/elstat/worker.py b/elstat/worker.py index 3cde172..b433414 100644 --- a/elstat/worker.py +++ b/elstat/worker.py @@ -19,7 +19,9 @@ class ServiceWorker: async def work(self): return await self.adapter.query(self, self.service['adapter_args']) - async def process_work(self, result): + async def process_work(self, result: tuple): + """Process given adapter result and insert into + the database.""" columns = self.adapter.spec['db'] conn = self.manager.conn @@ -45,10 +47,16 @@ class ServiceWorker: await self.process_work(res) await asyncio.sleep(self.service['poll']) + except asyncio.CancelledError: + self.log.info('cancelled, stopping') except Exception: - self.log.exception('fail on poll, retrying') + self.log.exception('fail on work loop, retrying') await self._work_loop() + def stop(self): + """Stop the current worker.""" + self._work_task.cancel() + def _start(self): self.log.info(f'starting work loop for {self.name}') - self.manager.loop.create_task(self._work_loop()) + self._work_task = self.manager.loop.create_task(self._work_loop()) diff --git a/run.py b/run.py index bc56b93..8e5e4e1 100644 --- a/run.py +++ b/run.py @@ -30,6 +30,7 @@ async def _app_start(refapp, loop): @app.listener('after_server_stop') async def _app_stop(refapp, _loop): + refapp.manager.close() refapp.conn.close()