111 lines
2.6 KiB
Python
111 lines
2.6 KiB
Python
import json
|
|
import uuid
|
|
import logging
|
|
import asyncio
|
|
|
|
import websockets
|
|
from sanic import Blueprint
|
|
|
|
bp = Blueprint(__name__)
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class OP:
|
|
UNSUBSCRIBE = -1
|
|
SUBSCRIBE = 0
|
|
SUBSCRIBED = 1
|
|
UNSUBSCRIBED = 2
|
|
DATA = 3
|
|
|
|
# incident specific
|
|
INCIDENT_NEW = 4
|
|
INCIDENT_UPDATE = 5
|
|
INCIDENT_CLOSE = 6
|
|
|
|
|
|
class ErrorCodes:
|
|
INVALID_PAYLOAD = 4200
|
|
TOO_MUCH = 4420
|
|
|
|
|
|
async def recv_msg(ws):
|
|
opening_msg = await ws.recv()
|
|
|
|
if len(opening_msg) > 256:
|
|
await ws.close(code=ErrorCodes.TOO_MUCH, reason='Too much data')
|
|
return
|
|
|
|
try:
|
|
return json.loads(opening_msg)
|
|
except json.decoder.JSONDecodeError:
|
|
await ws.close(code=ErrorCodes.INVALID_PAYLOAD,
|
|
reason='invalid json')
|
|
return
|
|
|
|
|
|
async def recv_op(ws, op: int):
|
|
payload = await recv_msg(ws)
|
|
|
|
if not payload:
|
|
return
|
|
|
|
if payload.get('op') != op:
|
|
await ws.close(code=ErrorCodes.INVALID_PAYLOAD,
|
|
reason='Invalid OP code')
|
|
return
|
|
|
|
return payload
|
|
|
|
|
|
async def _recv_payload_loop(ws, app):
|
|
while True:
|
|
try:
|
|
payload = await recv_msg(ws)
|
|
except websockets.exceptions.ConnectionClosed:
|
|
break
|
|
|
|
if not payload:
|
|
continue
|
|
|
|
op = payload.get('op')
|
|
chans = payload.get('channels')
|
|
|
|
if op == OP.SUBSCRIBE:
|
|
subscribed = app.manager.subscribe(chans, ws)
|
|
await ws.send(json.dumps({
|
|
'op': OP.SUBSCRIBED,
|
|
'channels': subscribed,
|
|
}))
|
|
elif op == OP.UNSUBSCRIBE:
|
|
unsub = app.manager.unsubscribe(chans, ws)
|
|
await ws.send(json.dumps({
|
|
'op': OP.UNSUBSCRIBED,
|
|
'channels': unsub,
|
|
}))
|
|
|
|
|
|
@bp.websocket('/api/streaming')
|
|
async def streaming_ws(request, ws):
|
|
hewwo_msg = await recv_op(ws, OP.SUBSCRIBE)
|
|
|
|
# subscribe the websocket to all channels it wants
|
|
ws.client_id = uuid.uuid4()
|
|
|
|
channels = hewwo_msg['channels']
|
|
subscribed = request.app.manager.subscribe(channels, ws)
|
|
|
|
try:
|
|
await ws.send(json.dumps({
|
|
'op': OP.SUBSCRIBED,
|
|
'channels': subscribed,
|
|
}))
|
|
|
|
request.app.loop.create_task(_recv_payload_loop(ws, request.app))
|
|
|
|
# keep websocket alive
|
|
while True:
|
|
await ws.ping()
|
|
await asyncio.sleep(1)
|
|
except websockets.exceptions.ConnectionClosed as wc:
|
|
log.warning(f'conn {ws.client_id} closed: {wc.code} {wc.reason!r}')
|
|
request.app.manager.unsub_all(ws)
|