2018-07-08 03:38:14 +00:00
|
|
|
import asyncio
|
|
|
|
import time
|
|
|
|
import re
|
2018-07-07 23:51:43 +00:00
|
|
|
|
2018-07-08 03:38:14 +00:00
|
|
|
PING_RGX = re.compile(r'(.+)( 0% packet loss)(.+)', re.I | re.M)
|
2018-07-14 02:20:09 +00:00
|
|
|
PING_LATENCY_RGX = re.compile('time\=(\d+(\.\d+)?) ms', re.M)
|
2018-07-07 23:51:43 +00:00
|
|
|
|
|
|
|
|
2018-07-08 03:38:14 +00:00
|
|
|
class Adapter:
|
|
|
|
spec = {
|
|
|
|
'db': None,
|
|
|
|
}
|
|
|
|
|
|
|
|
@classmethod
|
2018-07-15 01:27:06 +00:00
|
|
|
async def query(cls, _worker, _adp_args: dict) -> dict:
|
2018-07-08 03:38:14 +00:00
|
|
|
"""Main query function."""
|
|
|
|
raise NotImplementedError
|
|
|
|
|
2018-07-15 01:27:06 +00:00
|
|
|
@classmethod
|
|
|
|
def _construct(cls, *args) -> dict:
|
|
|
|
columns = cls.spec['db'][1:]
|
|
|
|
return {col: args[idx] for idx, col in enumerate(columns)}
|
|
|
|
|
2018-07-08 03:38:14 +00:00
|
|
|
|
|
|
|
class PingAdapter(Adapter):
|
|
|
|
"""Ping the given address and report if
|
|
|
|
any packet loss happened."""
|
|
|
|
spec = {
|
2018-07-14 02:20:09 +00:00
|
|
|
'db': ('timestamp', 'status', 'latency')
|
2018-07-08 03:38:14 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def query(cls, worker, adp_args: dict):
|
|
|
|
process = await asyncio.create_subprocess_shell(
|
|
|
|
f'ping -c 1 {adp_args["address"]}',
|
|
|
|
stderr=asyncio.subprocess.PIPE,
|
|
|
|
stdout=asyncio.subprocess.PIPE,
|
|
|
|
)
|
|
|
|
|
|
|
|
out, err = map(lambda s: s.decode('utf-8'),
|
|
|
|
await process.communicate())
|
|
|
|
|
2018-07-14 02:20:09 +00:00
|
|
|
out += err
|
|
|
|
|
|
|
|
alive = bool(re.search(PING_RGX, out))
|
|
|
|
latency = PING_LATENCY_RGX.search(out)
|
|
|
|
|
|
|
|
if latency is not None:
|
|
|
|
num = latency.group(1)
|
|
|
|
try:
|
|
|
|
latency = int(num)
|
|
|
|
except ValueError:
|
|
|
|
try:
|
|
|
|
latency = max(float(num), 1)
|
|
|
|
except ValueError:
|
|
|
|
latency = 0
|
|
|
|
else:
|
|
|
|
latency = 0
|
|
|
|
|
2018-07-15 01:27:06 +00:00
|
|
|
worker.log.info(f'{worker.name}: alive={alive} latency={latency}ms')
|
|
|
|
return cls._construct(alive, latency)
|
2018-07-08 03:38:14 +00:00
|
|
|
|
|
|
|
|
2018-07-09 05:48:44 +00:00
|
|
|
class HttpAdapter(Adapter):
|
2018-07-08 03:38:14 +00:00
|
|
|
"""Adapter to check if a certain
|
2018-07-09 05:48:44 +00:00
|
|
|
URL is giving 200."""
|
2018-07-08 03:38:14 +00:00
|
|
|
spec = {
|
|
|
|
'db': ('timestamp', 'status', 'latency')
|
|
|
|
}
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
async def query(cls, worker, adp_args: dict):
|
|
|
|
# yes, lots of attributes
|
|
|
|
session = worker.manager.app.session
|
|
|
|
|
|
|
|
t_start = time.monotonic()
|
2018-07-09 05:48:44 +00:00
|
|
|
resp = await session.get(f'{adp_args["url"]}')
|
2018-07-08 03:38:14 +00:00
|
|
|
t_end = time.monotonic()
|
|
|
|
|
|
|
|
latency = round((t_end - t_start) * 1000)
|
|
|
|
|
|
|
|
worker.log.info(f'{worker.name}: status={resp.status} '
|
|
|
|
f'latency={latency}ms')
|
|
|
|
|
|
|
|
if resp.status == 200:
|
2018-07-15 01:27:06 +00:00
|
|
|
return cls._construct(True, latency)
|
2018-07-08 03:38:14 +00:00
|
|
|
|
|
|
|
# use 0ms drops as failures
|
2018-07-15 01:27:06 +00:00
|
|
|
return cls._construct(False, 0)
|