change adapter response type to dict
this simplifies a lot of code since tuples are immutable. - adapters: add Adapter._construct - worker: set status state to False on worker crash
This commit is contained in:
parent
ecf6234cfe
commit
83b2d88311
3 changed files with 25 additions and 15 deletions
|
@ -12,10 +12,15 @@ class Adapter:
|
||||||
}
|
}
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def query(cls, _worker, _adp_args) -> tuple:
|
async def query(cls, _worker, _adp_args: dict) -> dict:
|
||||||
"""Main query function."""
|
"""Main query function."""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _construct(cls, *args) -> dict:
|
||||||
|
columns = cls.spec['db'][1:]
|
||||||
|
return {col: args[idx] for idx, col in enumerate(columns)}
|
||||||
|
|
||||||
|
|
||||||
class PingAdapter(Adapter):
|
class PingAdapter(Adapter):
|
||||||
"""Ping the given address and report if
|
"""Ping the given address and report if
|
||||||
|
@ -52,8 +57,8 @@ class PingAdapter(Adapter):
|
||||||
else:
|
else:
|
||||||
latency = 0
|
latency = 0
|
||||||
|
|
||||||
worker.log.info(f'{worker.name}: alive? {alive} latency? {latency}ms')
|
worker.log.info(f'{worker.name}: alive={alive} latency={latency}ms')
|
||||||
return (alive, latency)
|
return cls._construct(alive, latency)
|
||||||
|
|
||||||
|
|
||||||
class HttpAdapter(Adapter):
|
class HttpAdapter(Adapter):
|
||||||
|
@ -78,7 +83,7 @@ class HttpAdapter(Adapter):
|
||||||
f'latency={latency}ms')
|
f'latency={latency}ms')
|
||||||
|
|
||||||
if resp.status == 200:
|
if resp.status == 200:
|
||||||
return True, latency
|
return cls._construct(True, latency)
|
||||||
|
|
||||||
# use 0ms drops as failures
|
# use 0ms drops as failures
|
||||||
return False, 0
|
return cls._construct(False, 0)
|
||||||
|
|
|
@ -11,13 +11,9 @@ def get_status(manager):
|
||||||
if state is None:
|
if state is None:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# timestamp will always be the first
|
res[name] = state
|
||||||
worker = manager.workers[name]
|
|
||||||
columns = worker.adapter.spec['db'][1:]
|
|
||||||
res[name] = {}
|
|
||||||
for key, val in zip(columns, state):
|
|
||||||
res[name][key] = val
|
|
||||||
|
|
||||||
|
worker = manager.workers[name]
|
||||||
res[name]['description'] = worker.service['description']
|
res[name]['description'] = worker.service['description']
|
||||||
|
|
||||||
return res
|
return res
|
||||||
|
|
|
@ -19,7 +19,7 @@ class ServiceWorker:
|
||||||
async def work(self):
|
async def work(self):
|
||||||
return await self.adapter.query(self, self.service['adapter_args'])
|
return await self.adapter.query(self, self.service['adapter_args'])
|
||||||
|
|
||||||
async def process_work(self, result: tuple):
|
async def process_work(self, result: dict):
|
||||||
"""Process given adapter result and insert into
|
"""Process given adapter result and insert into
|
||||||
the database."""
|
the database."""
|
||||||
columns = self.adapter.spec['db']
|
columns = self.adapter.spec['db']
|
||||||
|
@ -33,7 +33,12 @@ class ServiceWorker:
|
||||||
VALUES ({args_str})
|
VALUES ({args_str})
|
||||||
"""
|
"""
|
||||||
|
|
||||||
conn.execute(query, (timestamp, ) + result)
|
args = []
|
||||||
|
for col in columns[1:]:
|
||||||
|
val = result[col]
|
||||||
|
args.append(val)
|
||||||
|
|
||||||
|
conn.execute(query, (timestamp, ) + tuple(args))
|
||||||
conn.commit()
|
conn.commit()
|
||||||
|
|
||||||
await self._dispatch_work(columns, timestamp, result)
|
await self._dispatch_work(columns, timestamp, result)
|
||||||
|
@ -42,8 +47,8 @@ class ServiceWorker:
|
||||||
prechan = columns[1:]
|
prechan = columns[1:]
|
||||||
chans = [f'{chan}:{self.name}' for chan in prechan]
|
chans = [f'{chan}:{self.name}' for chan in prechan]
|
||||||
|
|
||||||
for idx, chan in enumerate(chans):
|
for col, chan in zip(prechan, chans):
|
||||||
self.manager.publish(chan, (timestamp, result[idx]))
|
self.manager.publish(chan, [timestamp, result[col]])
|
||||||
|
|
||||||
async def _work_loop(self):
|
async def _work_loop(self):
|
||||||
try:
|
try:
|
||||||
|
@ -60,6 +65,10 @@ class ServiceWorker:
|
||||||
self.log.info('cancelled, stopping')
|
self.log.info('cancelled, stopping')
|
||||||
except Exception:
|
except Exception:
|
||||||
self.log.exception('fail on work loop, retrying')
|
self.log.exception('fail on work loop, retrying')
|
||||||
|
try:
|
||||||
|
self.manager.state[self.name]['status'] = False
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
await self._work_loop()
|
await self._work_loop()
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
|
|
Loading…
Reference in a new issue