incidents: add rest of basic API
- change all ISO timestamp strings to unix timestamps - add patch_incident and new_stage - streaming: remove INC_UPDATE_STAGE - manager: change primary key on instance_stages table This requires a table change across all elstat instances, since we changed the primary key on instance_stages.
This commit is contained in:
parent
dcadfd4c76
commit
9c2dbeaf59
3 changed files with 117 additions and 33 deletions
|
@ -1,5 +1,4 @@
|
||||||
import time
|
import time
|
||||||
import datetime
|
|
||||||
|
|
||||||
from sanic import Blueprint, response
|
from sanic import Blueprint, response
|
||||||
|
|
||||||
|
@ -10,6 +9,27 @@ from ..snowflake import get_snowflake
|
||||||
|
|
||||||
bp = Blueprint(__name__)
|
bp = Blueprint(__name__)
|
||||||
|
|
||||||
|
# since sqlite uses bigints, we make sure
|
||||||
|
# we get the same type being sent over.
|
||||||
|
# (yes, I know sqlite can accept floats
|
||||||
|
# in a bigint column, but having a float
|
||||||
|
# would cause API inconsistency).
|
||||||
|
_time = lambda: int(time.time())
|
||||||
|
|
||||||
|
|
||||||
|
def fetch_incident(conn, incident_id: dict) -> tuple:
|
||||||
|
"""Fetch a single incident's row."""
|
||||||
|
cur = conn.cursor()
|
||||||
|
|
||||||
|
cur.execute("""
|
||||||
|
SELECT id, incident_type, title, content, ongoing,
|
||||||
|
start_timestamp, end_timestamp
|
||||||
|
FROM incidents
|
||||||
|
WHERE id = ?
|
||||||
|
""", (incident_id,))
|
||||||
|
|
||||||
|
return cur.fetchone()
|
||||||
|
|
||||||
|
|
||||||
def fetch_stages(conn, incident_id: int) -> list:
|
def fetch_stages(conn, incident_id: int) -> list:
|
||||||
"""Fetch all the stages for an incident"""
|
"""Fetch all the stages for an incident"""
|
||||||
|
@ -29,22 +49,24 @@ def fetch_stages(conn, incident_id: int) -> list:
|
||||||
return {
|
return {
|
||||||
'title': stage_row[0],
|
'title': stage_row[0],
|
||||||
'content': stage_row[1],
|
'content': stage_row[1],
|
||||||
'created_at': datetime.datetime.fromtimestamp(
|
'created_at': stage_row[2],
|
||||||
stage_row[3]).isoformat(),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return list(map(stage_obj, stage_rows))
|
return list(map(stage_obj, stage_rows))
|
||||||
|
|
||||||
|
|
||||||
|
def publish_update(manager, incident_id: int):
|
||||||
|
"""Publish an update to an incident.
|
||||||
|
|
||||||
|
This makes sure the data being published is the latest
|
||||||
|
by requering the database.
|
||||||
|
"""
|
||||||
|
full = fetch_dict(manager.conn, incident_id)
|
||||||
|
manager.publish_incident(OP.INC_UPDATE, full)
|
||||||
|
|
||||||
|
|
||||||
def incident_dict(conn, row) -> dict:
|
def incident_dict(conn, row) -> dict:
|
||||||
"""make an incident dict, given incident row."""
|
"""make an incident dict, given incident row."""
|
||||||
start_timestamp = datetime.datetime.fromtimestamp(row[5]).isoformat()
|
|
||||||
|
|
||||||
if row[6]:
|
|
||||||
end_timestamp = datetime.datetime.fromtimestamp(row[6]).isoformat()
|
|
||||||
else:
|
|
||||||
end_timestamp = None
|
|
||||||
|
|
||||||
return {
|
return {
|
||||||
'id': str(row[0]),
|
'id': str(row[0]),
|
||||||
'type': row[1],
|
'type': row[1],
|
||||||
|
@ -52,13 +74,19 @@ def incident_dict(conn, row) -> dict:
|
||||||
'content': row[3],
|
'content': row[3],
|
||||||
'ongoing': row[4],
|
'ongoing': row[4],
|
||||||
|
|
||||||
'start_date': start_timestamp,
|
'start_date': row[5],
|
||||||
'end_date': end_timestamp,
|
'end_date': row[6],
|
||||||
|
|
||||||
'stages': fetch_stages(conn, row[0])
|
'stages': fetch_stages(conn, row[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def fetch_dict(conn, incident_id) -> dict:
|
||||||
|
"""Fetch an incident and return a dictionary."""
|
||||||
|
row = fetch_incident(conn, incident_id)
|
||||||
|
return incident_dict(conn, row)
|
||||||
|
|
||||||
|
|
||||||
@bp.get('/api/incidents/current')
|
@bp.get('/api/incidents/current')
|
||||||
async def get_current_incident(request):
|
async def get_current_incident(request):
|
||||||
"""Get the current incident, if any."""
|
"""Get the current incident, if any."""
|
||||||
|
@ -111,19 +139,18 @@ async def get_incidents(request, page: int):
|
||||||
for row in rows:
|
for row in rows:
|
||||||
res.append(incident_dict(manager.conn, row))
|
res.append(incident_dict(manager.conn, row))
|
||||||
|
|
||||||
return response.json({
|
return response.json(res)
|
||||||
'incidents': res,
|
|
||||||
})
|
|
||||||
|
|
||||||
|
|
||||||
@bp.put('/api/incidents')
|
@bp.put('/api/incidents')
|
||||||
@auth_route
|
@auth_route
|
||||||
async def create_incident(request):
|
async def create_incident(request):
|
||||||
|
"""Create a new incident and put it as ongoing by default."""
|
||||||
incident = request.json
|
incident = request.json
|
||||||
manager = request.app.manager
|
manager = request.app.manager
|
||||||
|
|
||||||
incident_id = get_snowflake()
|
incident_id = get_snowflake()
|
||||||
start_timestamp = time.time()
|
start_timestamp = _time()
|
||||||
|
|
||||||
manager.conn.execute("""
|
manager.conn.execute("""
|
||||||
INSERT INTO incidents (id, incident_type, title, content,
|
INSERT INTO incidents (id, incident_type, title, content,
|
||||||
|
@ -138,15 +165,78 @@ async def create_incident(request):
|
||||||
))
|
))
|
||||||
manager.conn.commit()
|
manager.conn.commit()
|
||||||
|
|
||||||
d_incident = incident_dict(manager.conn, (
|
# refetch so we know we have the good stuff
|
||||||
|
incident = fetch_dict(manager.conn, incident_id)
|
||||||
|
manager.publish_incident(OP.INCIDENT_NEW, incident)
|
||||||
|
return response.json(incident)
|
||||||
|
|
||||||
|
|
||||||
|
@bp.patch('/api/incident/<incident_id:int>')
|
||||||
|
@auth_route
|
||||||
|
async def patch_incident(request, incident_id):
|
||||||
|
"""Patch an existing incident."""
|
||||||
|
incident = request.json
|
||||||
|
manager = request.app.manager
|
||||||
|
|
||||||
|
if 'end_timestamp' not in incident and not incident['ongoing']:
|
||||||
|
incident['end_timestamp'] = _time()
|
||||||
|
|
||||||
|
orig = fetch_dict(manager.conn, incident_id)
|
||||||
|
|
||||||
|
def _get(field):
|
||||||
|
return incident.get(field, orig[field])
|
||||||
|
|
||||||
|
manager.conn.execute("""
|
||||||
|
UPDATE incidents
|
||||||
|
SET
|
||||||
|
incident_type = ?,
|
||||||
|
title = ?,
|
||||||
|
content = ?,
|
||||||
|
ongoing = ?,
|
||||||
|
start_timestamp = ?,
|
||||||
|
end_timestamp = ?
|
||||||
|
WHERE
|
||||||
|
id = ?
|
||||||
|
""", (
|
||||||
|
_get('type'),
|
||||||
|
_get('title'),
|
||||||
|
_get('content'),
|
||||||
|
_get('ongoing'),
|
||||||
|
_get('start_date'),
|
||||||
|
_get('end_date'),
|
||||||
incident_id,
|
incident_id,
|
||||||
incident['type'],
|
|
||||||
incident['title'],
|
|
||||||
incident['content'],
|
|
||||||
True,
|
|
||||||
start_timestamp,
|
|
||||||
None
|
|
||||||
))
|
))
|
||||||
|
|
||||||
manager.publish_incident(OP.INCIDENT_NEW, d_incident)
|
manager.conn.commit()
|
||||||
return response.json(d_incident)
|
|
||||||
|
if incident['ongoing']:
|
||||||
|
publish_update(manager, incident_id)
|
||||||
|
else:
|
||||||
|
manager.publish_incident(OP.INCIDENT_CLOSE, {**incident, **{
|
||||||
|
'id': str(incident_id)
|
||||||
|
}})
|
||||||
|
|
||||||
|
return response.text('', status=204)
|
||||||
|
|
||||||
|
|
||||||
|
@bp.post('/api/incident/<incident_id:int>/stages')
|
||||||
|
@auth_route
|
||||||
|
async def new_stage(request, incident_id):
|
||||||
|
"""Create a new stage in an incident."""
|
||||||
|
stage = request.json
|
||||||
|
manager = request.app.manager
|
||||||
|
|
||||||
|
timestamp = _time()
|
||||||
|
|
||||||
|
manager.conn.execute("""
|
||||||
|
INSERT INTO incident_stages (parent_id, timestamp, title, content)
|
||||||
|
VALUES (?, ?, ?, ?)
|
||||||
|
""", (incident_id, timestamp, stage['title'], stage['content']))
|
||||||
|
manager.conn.commit()
|
||||||
|
|
||||||
|
publish_update(manager, incident_id)
|
||||||
|
|
||||||
|
return response.json({**{
|
||||||
|
'parent_id': str(incident_id),
|
||||||
|
'created_at': timestamp,
|
||||||
|
}, **stage})
|
||||||
|
|
|
@ -19,13 +19,7 @@ class OP:
|
||||||
|
|
||||||
# incident specific
|
# incident specific
|
||||||
INCIDENT_NEW = 4
|
INCIDENT_NEW = 4
|
||||||
|
|
||||||
# when a title of incident updates or smth
|
|
||||||
INC_UPDATE = 5
|
INC_UPDATE = 5
|
||||||
|
|
||||||
# when new stage comes up or
|
|
||||||
# a current one is updated
|
|
||||||
INC_UPDATE_STAGE = 6
|
|
||||||
INCIDENT_CLOSE = 7
|
INCIDENT_CLOSE = 7
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -60,7 +60,7 @@ class ServiceManager:
|
||||||
timestamp bigint,
|
timestamp bigint,
|
||||||
title text,
|
title text,
|
||||||
content text,
|
content text,
|
||||||
PRIMARY KEY (parent_id)
|
PRIMARY KEY (parent_id, timestamp)
|
||||||
);
|
);
|
||||||
""")
|
""")
|
||||||
|
|
||||||
|
@ -105,10 +105,10 @@ class ServiceManager:
|
||||||
try:
|
try:
|
||||||
self.subscribers[chan].append(wid)
|
self.subscribers[chan].append(wid)
|
||||||
subscribed.append(chan)
|
subscribed.append(chan)
|
||||||
log.info(f'Subscribed {wid} to {chan}')
|
|
||||||
except KeyError:
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
log.info(f'Subscribed {wid} to {subscribed}')
|
||||||
return subscribed
|
return subscribed
|
||||||
|
|
||||||
def unsubscribe(self, channels: List[str], websocket) -> List[str]:
|
def unsubscribe(self, channels: List[str], websocket) -> List[str]:
|
||||||
|
|
Loading…
Reference in a new issue