import asyncio import json import time import re PING_RGX = re.compile(r'(.+)( 0% packet loss)(.+)', re.I | re.M) PING_LATENCY_RGX = re.compile('time\=(\d+(\.\d+)?) ms', re.M) PING_ERROR_RGX = re.compile('icmp_seq\=(\d+)\ ([^time].*)$', re.M) class Adapter: spec = { 'db': None, } @classmethod async def query(cls, _worker, _adp_args: dict) -> dict: """Main query function.""" raise NotImplementedError @classmethod def _construct(cls, *args) -> dict: columns = cls.spec['db'][1:] base = {col: args[idx] for idx, col in enumerate(columns)} # if theres an extra arg, treat it as the error field if len(args) > len(columns): base['error'] = args[len(args) - 1] return base class PingAdapter(Adapter): """Ping the given address and report if any packet loss happened.""" spec = { 'db': ('timestamp', 'status', 'latency') } @classmethod async def query(cls, worker, adp_args: dict): process = await asyncio.create_subprocess_shell( f'ping -c 1 {adp_args["address"]}', stderr=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, ) out, err = map(lambda s: s.decode('utf-8'), await process.communicate()) out += err alive = bool(re.search(PING_RGX, out)) latency = PING_LATENCY_RGX.search(out) if latency is not None: num = latency.group(1) try: latency = int(num) except ValueError: try: latency = max(float(num), 1) except ValueError: latency = 0 else: latency = 0 if alive: # we dont need to keep sending err_msg on alive scenarios return cls._construct(alive, latency) err = PING_ERROR_RGX.search(out) err_msg = err.group(2) if not alive and err else 'packet lost' # only log on errors worker.log.info(f'alive={alive} latency={latency}ms ' f'err={err_msg!r} out={out}') return cls._construct(alive, latency, err_msg) class HttpAdapter(Adapter): """Adapter to check if a certain URL is giving 200.""" spec = { 'db': ('timestamp', 'status', 'latency') } @classmethod def get_phrase(cls, status: int) -> str: """Get a string representing the error given by the http status code.""" with open('./status.json', 'r') as status_file: statuses = json.load(status_file) try: return statuses[str(status)] except KeyError: return 'Unknown status' @classmethod async def query(cls, worker, adp_args: dict): # yes, lots of attributes session = worker.manager.app.session t_start = time.monotonic() resp = await session.get(f'{adp_args["url"]}') t_end = time.monotonic() # handle both 200 and 204 succ = resp.status in (200, 204) latency = round((t_end - t_start) * 1000) # drop latency to 0 to signal a non-success latency = latency if succ else 0 if not succ: # only log on errors worker.log.info(f'status={resp.status} latency={latency}ms') # construct error message from http error code status_phrase = cls.get_phrase(resp.status) err_str = f'http status {resp.status} - {status_phrase}' return cls._construct(succ, latency, err_str) return cls._construct(succ, latency if succ else 0)