From dcadfd4c762f51c918fe47ab9f35754108b6433f Mon Sep 17 00:00:00 2001 From: Luna Mendes Date: Tue, 17 Jul 2018 22:59:11 -0300 Subject: [PATCH] blueprints.incidents: add api base - add getting a single incident, getting multiple (with pages), and creating an incident - blueprints.streaming: add incident related OP codes - consts: add IncidentType - manager: add incidents channel, add publish_incident - elstat: add snowflake.py --- elstat/blueprints/incidents.py | 159 ++++++++++++++++++++++++--------- elstat/blueprints/streaming.py | 11 +++ elstat/consts.py | 6 ++ elstat/manager.py | 25 +++++- elstat/snowflake.py | 95 ++++++++++++++++++++ 5 files changed, 253 insertions(+), 43 deletions(-) create mode 100644 elstat/snowflake.py diff --git a/elstat/blueprints/incidents.py b/elstat/blueprints/incidents.py index 55fdc68..33d540f 100644 --- a/elstat/blueprints/incidents.py +++ b/elstat/blueprints/incidents.py @@ -1,15 +1,67 @@ +import time import datetime from sanic import Blueprint, response from .decorators import auth_route +from .streaming import OP +from ..snowflake import get_snowflake + bp = Blueprint(__name__) -# TODO: pages -@bp.get('/api/incidents') -async def get_incidents(request): +def fetch_stages(conn, incident_id: int) -> list: + """Fetch all the stages for an incident""" + cur = conn.cursor() + + cur.execute(""" + SELECT title, content, timestamp + FROM incident_stages + WHERE parent_id = ? + ORDER BY timestamp ASC + """, (incident_id,)) + + stage_rows = cur.fetchall() + + def stage_obj(stage_row) -> dict: + """give a stage dict, given the stage row.""" + return { + 'title': stage_row[0], + 'content': stage_row[1], + 'created_at': datetime.datetime.fromtimestamp( + stage_row[3]).isoformat(), + } + + return list(map(stage_obj, stage_rows)) + + +def incident_dict(conn, row) -> dict: + """make an incident dict, given incident row.""" + start_timestamp = datetime.datetime.fromtimestamp(row[5]).isoformat() + + if row[6]: + end_timestamp = datetime.datetime.fromtimestamp(row[6]).isoformat() + else: + end_timestamp = None + + return { + 'id': str(row[0]), + 'type': row[1], + 'title': row[2], + 'content': row[3], + 'ongoing': row[4], + + 'start_date': start_timestamp, + 'end_date': end_timestamp, + + 'stages': fetch_stages(conn, row[0]) + } + + +@bp.get('/api/incidents/current') +async def get_current_incident(request): + """Get the current incident, if any.""" manager = request.app.manager cur = manager.conn.cursor() @@ -17,7 +69,39 @@ async def get_incidents(request): SELECT id, incident_type, title, content, ongoing, start_timestamp, end_timestamp FROM incidents + ORDER BY id ASC + LIMIT 1 + """) + + rows = cur.fetchall() + + try: + row = next(iter(rows)) + drow = incident_dict(manager.conn, row) + except StopIteration: + row = None + drow = {} + + return response.json({ + 'all_good': not drow.get('ongoing'), + 'current_incident': None if drow == {} else drow + }) + + +@bp.get('/api/incidents/') +async def get_incidents(request, page: int): + """Get a list of incidents.""" + manager = request.app.manager + cur = manager.conn.cursor() + + cur.execute(f""" + SELECT id, incident_type, title, content, ongoing, + start_timestamp, end_timestamp + FROM incidents ORDER BY id DESC + + LIMIT 10 + OFFSET ({page} * 10) """) rows = cur.fetchall() @@ -25,44 +109,9 @@ async def get_incidents(request): res = [] for row in rows: - cur = manager.conn.cursor() - - cur.execute(""" - SELECT title, content - FROM incident_stages - WHERE parent_id = ? - ORDER BY timestamp ASC - """, (row[0],)) - - stage_rows = cur.fetchall() - def stage_obj(stage_row): - return { - 'title': stage_row[0], - 'content': stage_row[1], - } - - stages = list(map(stage_obj, stage_rows)) - start_timestamp = datetime.datetime.fromtimestamp(row[5]) - end_timestamp = datetime.datetime.fromtimestamp(row[6]) - - res.append({ - 'id': str(row[0]), - 'type': row[1], - 'title': row[2], - 'content': row[3], - 'ongoing': row[4], - 'start_timestamp': start_timestamp.isoformat(), - 'end_timestamp': end_timestamp.isoformat(), - 'stages': stages - }) - - try: - first = next(iter(res)) - except StopIteration: - first = {'ongoing': False} + res.append(incident_dict(manager.conn, row)) return response.json({ - 'all_good': not first['ongoing'], 'incidents': res, }) @@ -70,4 +119,34 @@ async def get_incidents(request): @bp.put('/api/incidents') @auth_route async def create_incident(request): - return response.text('im gay') + incident = request.json + manager = request.app.manager + + incident_id = get_snowflake() + start_timestamp = time.time() + + manager.conn.execute(""" + INSERT INTO incidents (id, incident_type, title, content, + ongoing, start_timestamp, end_timestamp) + VALUES (?, ?, ?, ?, true, ?, NULL) + """, ( + incident_id, + incident['type'], + incident['title'], + incident['content'], + start_timestamp, + )) + manager.conn.commit() + + d_incident = incident_dict(manager.conn, ( + incident_id, + incident['type'], + incident['title'], + incident['content'], + True, + start_timestamp, + None + )) + + manager.publish_incident(OP.INCIDENT_NEW, d_incident) + return response.json(d_incident) diff --git a/elstat/blueprints/streaming.py b/elstat/blueprints/streaming.py index 105a52d..70411de 100644 --- a/elstat/blueprints/streaming.py +++ b/elstat/blueprints/streaming.py @@ -17,6 +17,17 @@ class OP: UNSUBSCRIBED = 2 DATA = 3 + # incident specific + INCIDENT_NEW = 4 + + # when a title of incident updates or smth + INC_UPDATE = 5 + + # when new stage comes up or + # a current one is updated + INC_UPDATE_STAGE = 6 + INCIDENT_CLOSE = 7 + class ErrorCodes: INVALID_PAYLOAD = 4200 diff --git a/elstat/consts.py b/elstat/consts.py index 6bf276e..662dcfc 100644 --- a/elstat/consts.py +++ b/elstat/consts.py @@ -5,3 +5,9 @@ ADAPTERS = { 'http': HttpAdapter, 'ping': PingAdapter, } + + +class IncidentType: + OUTAGE = 'outage' + PARTIAL_OUTAGE = 'partial_outage' + DEGRADED = 'degraded_service' diff --git a/elstat/manager.py b/elstat/manager.py index 3521fcf..a4c6011 100644 --- a/elstat/manager.py +++ b/elstat/manager.py @@ -78,6 +78,8 @@ class ServiceManager: self._check(columns, 'latency', worker.name) def _start(self): + self.subscribers['incidents'] = [] + for name, service in self.cfg.SERVICES.items(): self._make_db_table(name, service) @@ -141,17 +143,21 @@ class ServiceManager: pass return unsub - def _raw_send(self, websocket, channel: str, data: Any): + def _ws_send(self, websocket, data: Any): if websocket is None: return loop = self.app.loop - return loop.create_task(websocket.send(json.dumps({ + data = json.dumps(data) + return loop.create_task(websocket.send(data)) + + def _raw_send(self, websocket, channel: str, data: Any): + return self._ws_send(websocket, { 'op': OP.DATA, 'c': channel, 'd': data, - }))) + }) def publish(self, channel: str, data: Any): ws_ids = self.subscribers[channel] @@ -162,3 +168,16 @@ class ServiceManager: tasks = map(_send, websockets) return list(tasks) + + def publish_incident(self, op: int, data: Any): + ws_ids = self.subscribers['incidents'] + websockets = map(self._websockets.get, ws_ids) + + def _send(websocket): + return self._ws_send(websocket, { + 'op': op, + 'd': data, + }) + + tasks = map(_send, websockets) + return list(tasks) diff --git a/elstat/snowflake.py b/elstat/snowflake.py new file mode 100644 index 0000000..f1c473f --- /dev/null +++ b/elstat/snowflake.py @@ -0,0 +1,95 @@ +""" +snowflake.py - snowflake helper functions + + These functions generate discord-like snowflakes. + File brought in from + litecord-reference(https://github.com/lnmds/litecord-reference) +""" +import time +import hashlib +import os +import base64 + +# encoded in ms +EPOCH = 1420070400000 + +# internal state +_generated_ids = 0 +PROCESS_ID = 1 +WORKER_ID = 1 + +Snowflake = int + + +def get_invite_code() -> str: + """Get a random invite code.""" + random_stuff = hashlib.sha512(os.urandom(1024)).digest() + code = base64.urlsafe_b64encode(random_stuff).decode().replace('=', '5') \ + .replace('_', 'W').replace('-', 'm') + return code[:6] + + +def _snowflake(timestamp: int) -> Snowflake: + """Get a snowflake from a specific timestamp + + This function relies on modifying internal variables + to generate unique snowflakes. Because of that every call + to this function will generate a different snowflake, + even with the same timestamp. + + Arguments + --------- + timestamp: int + Timestamp to be feed in to the snowflake algorithm. + This timestamp has to be an UNIX timestamp + with millisecond precision. + """ + # Yes, using global variables aren't the best idea + # Maybe we could distribute the work of snowflake generation + # to actually separated servers? :thinking: + global _generated_ids + + # bits 0-12 encode _generated_ids (size 12) + genid_b = '{0:012b}'.format(_generated_ids) + + # bits 12-17 encode PROCESS_ID (size 5) + procid_b = '{0:05b}'.format(PROCESS_ID) + + # bits 17-22 encode WORKER_ID (size 5) + workid_b = '{0:05b}'.format(WORKER_ID) + + # bits 22-64 encode (timestamp - EPOCH) (size 42) + epochized = timestamp - EPOCH + epoch_b = '{0:042b}'.format(epochized) + + snowflake_b = f'{epoch_b}{workid_b}{procid_b}{genid_b}' + _generated_ids += 1 + + return int(snowflake_b, 2) + + +def snowflake_time(snowflake: Snowflake) -> float: + """Get the UNIX timestamp(with millisecond precision, as a float) + from a specific snowflake. + """ + + # the total size for a snowflake is 64 bits, + # considering it is a string, position 0 to 42 will give us + # the `epochized` variable + snowflake_b = '{0:064b}'.format(snowflake) + epochized_b = snowflake_b[:42] + epochized = int(epochized_b, 2) + + # since epochized is the time *since* the EPOCH + # the unix timestamp will be the time *plus* the EPOCH + timestamp = epochized + EPOCH + + # convert it to seconds + # since we don't want to break the entire + # snowflake interface + return timestamp / 1000 + + +def get_snowflake(): + """Generate a snowflake""" + return _snowflake(int(time.time() * 1000))