2018-07-08 03:38:14 +00:00
|
|
|
import logging
|
2018-07-11 20:00:39 +00:00
|
|
|
import json
|
2018-07-08 03:38:14 +00:00
|
|
|
|
2018-07-11 06:42:31 +00:00
|
|
|
from typing import List, Dict, Any
|
2018-07-10 23:03:07 +00:00
|
|
|
|
2018-07-08 03:38:14 +00:00
|
|
|
from .consts import ADAPTERS
|
2018-07-07 23:51:43 +00:00
|
|
|
from .worker import ServiceWorker
|
|
|
|
|
2018-07-11 20:00:39 +00:00
|
|
|
from .blueprints.streaming import OP
|
|
|
|
|
2018-07-08 03:38:14 +00:00
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
_COLUMNS = {
|
|
|
|
'timestamp': 'timestamp bigint',
|
|
|
|
'status': 'status bool',
|
|
|
|
'latency': 'latency bigint',
|
|
|
|
}
|
|
|
|
|
2018-07-07 23:51:43 +00:00
|
|
|
|
|
|
|
class ServiceManager:
|
|
|
|
def __init__(self, app):
|
|
|
|
self.app = app
|
|
|
|
self.cfg = app.cfg
|
|
|
|
self.conn = app.conn
|
|
|
|
self.loop = app.loop
|
|
|
|
|
|
|
|
self.workers = {}
|
|
|
|
self.state = {}
|
2018-07-10 23:03:07 +00:00
|
|
|
self.subscribers = {}
|
2018-07-11 20:00:39 +00:00
|
|
|
self._websockets = {}
|
2018-07-07 23:51:43 +00:00
|
|
|
|
|
|
|
self._start()
|
|
|
|
|
2018-07-08 03:38:14 +00:00
|
|
|
def _make_db_table(self, name: str, service: dict):
|
|
|
|
adapter = ADAPTERS[service['adapter']]
|
|
|
|
|
|
|
|
columnstr = map(_COLUMNS.get, adapter.spec['db'])
|
|
|
|
columnstr = ',\n'.join(columnstr)
|
|
|
|
|
|
|
|
log.info(f'Making table for {name}')
|
|
|
|
self.conn.executescript(f"""
|
|
|
|
CREATE TABLE IF NOT EXISTS {name} (
|
|
|
|
{columnstr}
|
|
|
|
);
|
|
|
|
""")
|
|
|
|
|
2018-07-14 02:20:09 +00:00
|
|
|
self.conn.executescript("""
|
|
|
|
CREATE TABLE IF NOT EXISTS incidents (
|
|
|
|
id bigint PRIMARY KEY,
|
|
|
|
incident_type text,
|
|
|
|
title text,
|
|
|
|
content text,
|
|
|
|
ongoing bool,
|
|
|
|
start_timestamp bigint,
|
|
|
|
end_timestamp bigint
|
|
|
|
);
|
|
|
|
|
|
|
|
CREATE TABLE IF NOT EXISTS incident_stages (
|
|
|
|
parent_id bigint REFERENCES incidents (id) NOT NULL,
|
|
|
|
timestamp bigint,
|
|
|
|
title text,
|
|
|
|
content text,
|
|
|
|
PRIMARY KEY (parent_id)
|
|
|
|
);
|
|
|
|
""")
|
|
|
|
|
2018-07-11 06:42:31 +00:00
|
|
|
def _check(self, columns: tuple, field: str, worker_name: str):
|
|
|
|
chan_name = f'{field}:{worker_name}'
|
2018-07-11 05:48:00 +00:00
|
|
|
|
|
|
|
if field in columns and chan_name not in self.subscribers:
|
|
|
|
self.subscribers[chan_name] = []
|
|
|
|
log.info(f'Created channel {chan_name}')
|
|
|
|
|
|
|
|
def _create_channels(self, worker):
|
|
|
|
columns = worker.adapter.spec['db']
|
|
|
|
|
2018-07-11 06:42:31 +00:00
|
|
|
self._check(columns, 'status', worker.name)
|
|
|
|
self._check(columns, 'latency', worker.name)
|
2018-07-11 05:48:00 +00:00
|
|
|
|
2018-07-07 23:51:43 +00:00
|
|
|
def _start(self):
|
|
|
|
for name, service in self.cfg.SERVICES.items():
|
2018-07-08 03:38:14 +00:00
|
|
|
self._make_db_table(name, service)
|
|
|
|
|
2018-07-07 23:51:43 +00:00
|
|
|
# spawn a service worker
|
|
|
|
serv_worker = ServiceWorker(self, name, service)
|
|
|
|
self.workers[name] = serv_worker
|
2018-07-09 05:48:44 +00:00
|
|
|
self.state[name] = None
|
2018-07-10 23:03:07 +00:00
|
|
|
|
2018-07-11 05:48:00 +00:00
|
|
|
self._create_channels(serv_worker)
|
|
|
|
|
|
|
|
def close(self):
|
|
|
|
for worker in self.workers.values():
|
|
|
|
worker.stop()
|
|
|
|
|
2018-07-11 21:23:47 +00:00
|
|
|
def subscribe(self, channels: List[str], websocket) -> List[str]:
|
2018-07-10 23:03:07 +00:00
|
|
|
"""Subscribe to a list of channels."""
|
2018-07-11 21:23:47 +00:00
|
|
|
wid = websocket.client_id
|
2018-07-10 23:03:07 +00:00
|
|
|
subscribed = []
|
|
|
|
|
2018-07-11 20:00:39 +00:00
|
|
|
self._websockets[websocket.client_id] = websocket
|
|
|
|
|
2018-07-10 23:03:07 +00:00
|
|
|
for chan in channels:
|
|
|
|
try:
|
2018-07-11 21:23:47 +00:00
|
|
|
self.subscribers[chan].append(wid)
|
2018-07-11 20:00:39 +00:00
|
|
|
subscribed.append(chan)
|
2018-07-11 21:23:47 +00:00
|
|
|
log.info(f'Subscribed {wid} to {chan}')
|
2018-07-10 23:03:07 +00:00
|
|
|
except KeyError:
|
|
|
|
pass
|
|
|
|
|
|
|
|
return subscribed
|
|
|
|
|
2018-07-11 21:23:47 +00:00
|
|
|
def unsubscribe(self, channels: List[str], websocket) -> List[str]:
|
|
|
|
wid = websocket.client_id
|
|
|
|
unsub = []
|
|
|
|
|
|
|
|
for chan in channels:
|
|
|
|
try:
|
|
|
|
self.subscribers[chan].remove(wid)
|
|
|
|
unsub.append(chan)
|
|
|
|
log.info(f'Unsubscribed {wid} from {chan}')
|
|
|
|
except (KeyError, ValueError):
|
|
|
|
pass
|
|
|
|
|
|
|
|
return unsub
|
|
|
|
|
2018-07-10 23:03:07 +00:00
|
|
|
def unsub_all(self, websocket):
|
|
|
|
"""Unsubscribe a websocket from all known channels."""
|
|
|
|
unsub = []
|
|
|
|
|
|
|
|
for chan, subs in self.subscribers.items():
|
|
|
|
try:
|
|
|
|
subs.remove(websocket.client_id)
|
|
|
|
unsub.append(chan)
|
|
|
|
except ValueError:
|
|
|
|
pass
|
|
|
|
|
|
|
|
log.info(f'unsubscribed {websocket.client_id} from {unsub}')
|
2018-07-11 20:00:39 +00:00
|
|
|
try:
|
|
|
|
self._websockets.pop(websocket.client_id)
|
|
|
|
except KeyError:
|
|
|
|
pass
|
2018-07-10 23:03:07 +00:00
|
|
|
return unsub
|
2018-07-11 06:42:31 +00:00
|
|
|
|
2018-07-11 20:00:39 +00:00
|
|
|
def _raw_send(self, websocket, channel: str, data: Any):
|
|
|
|
if websocket is None:
|
|
|
|
return
|
|
|
|
|
|
|
|
loop = self.app.loop
|
|
|
|
|
|
|
|
return loop.create_task(websocket.send(json.dumps({
|
|
|
|
'op': OP.DATA,
|
|
|
|
'c': channel,
|
|
|
|
'd': data,
|
|
|
|
})))
|
|
|
|
|
2018-07-11 06:42:31 +00:00
|
|
|
def publish(self, channel: str, data: Any):
|
2018-07-11 20:00:39 +00:00
|
|
|
ws_ids = self.subscribers[channel]
|
|
|
|
websockets = map(self._websockets.get, ws_ids)
|
|
|
|
|
|
|
|
def _send(websocket):
|
|
|
|
return self._raw_send(websocket, channel, data)
|
2018-07-11 06:42:31 +00:00
|
|
|
|
2018-07-11 20:00:39 +00:00
|
|
|
tasks = map(_send, websockets)
|
|
|
|
return list(tasks)
|