From 83b2d8831151d606493a1d9a598a9c197321f8cb Mon Sep 17 00:00:00 2001 From: Luna Mendes Date: Sat, 14 Jul 2018 22:27:06 -0300 Subject: [PATCH] change adapter response type to dict this simplifies a lot of code since tuples are immutable. - adapters: add Adapter._construct - worker: set status state to False on worker crash --- elstat/adapters.py | 15 ++++++++++----- elstat/blueprints/api.py | 8 ++------ elstat/worker.py | 17 +++++++++++++---- 3 files changed, 25 insertions(+), 15 deletions(-) diff --git a/elstat/adapters.py b/elstat/adapters.py index 12ab0e0..920fd77 100644 --- a/elstat/adapters.py +++ b/elstat/adapters.py @@ -12,10 +12,15 @@ class Adapter: } @classmethod - async def query(cls, _worker, _adp_args) -> tuple: + async def query(cls, _worker, _adp_args: dict) -> dict: """Main query function.""" raise NotImplementedError + @classmethod + def _construct(cls, *args) -> dict: + columns = cls.spec['db'][1:] + return {col: args[idx] for idx, col in enumerate(columns)} + class PingAdapter(Adapter): """Ping the given address and report if @@ -52,8 +57,8 @@ class PingAdapter(Adapter): else: latency = 0 - worker.log.info(f'{worker.name}: alive? {alive} latency? {latency}ms') - return (alive, latency) + worker.log.info(f'{worker.name}: alive={alive} latency={latency}ms') + return cls._construct(alive, latency) class HttpAdapter(Adapter): @@ -78,7 +83,7 @@ class HttpAdapter(Adapter): f'latency={latency}ms') if resp.status == 200: - return True, latency + return cls._construct(True, latency) # use 0ms drops as failures - return False, 0 + return cls._construct(False, 0) diff --git a/elstat/blueprints/api.py b/elstat/blueprints/api.py index 8287c25..cf6678f 100644 --- a/elstat/blueprints/api.py +++ b/elstat/blueprints/api.py @@ -11,13 +11,9 @@ def get_status(manager): if state is None: continue - # timestamp will always be the first - worker = manager.workers[name] - columns = worker.adapter.spec['db'][1:] - res[name] = {} - for key, val in zip(columns, state): - res[name][key] = val + res[name] = state + worker = manager.workers[name] res[name]['description'] = worker.service['description'] return res diff --git a/elstat/worker.py b/elstat/worker.py index c91201c..470f34e 100644 --- a/elstat/worker.py +++ b/elstat/worker.py @@ -19,7 +19,7 @@ class ServiceWorker: async def work(self): return await self.adapter.query(self, self.service['adapter_args']) - async def process_work(self, result: tuple): + async def process_work(self, result: dict): """Process given adapter result and insert into the database.""" columns = self.adapter.spec['db'] @@ -33,7 +33,12 @@ class ServiceWorker: VALUES ({args_str}) """ - conn.execute(query, (timestamp, ) + result) + args = [] + for col in columns[1:]: + val = result[col] + args.append(val) + + conn.execute(query, (timestamp, ) + tuple(args)) conn.commit() await self._dispatch_work(columns, timestamp, result) @@ -42,8 +47,8 @@ class ServiceWorker: prechan = columns[1:] chans = [f'{chan}:{self.name}' for chan in prechan] - for idx, chan in enumerate(chans): - self.manager.publish(chan, (timestamp, result[idx])) + for col, chan in zip(prechan, chans): + self.manager.publish(chan, [timestamp, result[col]]) async def _work_loop(self): try: @@ -60,6 +65,10 @@ class ServiceWorker: self.log.info('cancelled, stopping') except Exception: self.log.exception('fail on work loop, retrying') + try: + self.manager.state[self.name]['status'] = False + except KeyError: + pass await self._work_loop() def stop(self):