import json import uuid import logging import asyncio import websockets from sanic import Blueprint bp = Blueprint(__name__) log = logging.getLogger(__name__) class OP: UNSUBSCRIBE = -1 SUBSCRIBE = 0 SUBSCRIBED = 1 UNSUBSCRIBED = 2 DATA = 3 # incident specific INCIDENT_NEW = 4 INC_UPDATE = 5 INCIDENT_CLOSE = 6 class ErrorCodes: INVALID_PAYLOAD = 4200 TOO_MUCH = 4420 async def recv_msg(ws): opening_msg = await ws.recv() if len(opening_msg) > 256: await ws.close(code=ErrorCodes.TOO_MUCH, reason='Too much data') return try: return json.loads(opening_msg) except json.decoder.JSONDecodeError: await ws.close(code=ErrorCodes.INVALID_PAYLOAD, reason='invalid json') return async def recv_op(ws, op: int): payload = await recv_msg(ws) if not payload: return if payload.get('op') != op: await ws.close(code=ErrorCodes.INVALID_PAYLOAD, reason='Invalid OP code') return return payload async def _recv_payload_loop(ws, app): while True: try: payload = await recv_msg(ws) except websockets.exceptions.ConnectionClosed: break if not payload: continue op = payload.get('op') chans = payload.get('channels') if op == OP.SUBSCRIBE: subscribed = app.manager.subscribe(chans, ws) await ws.send(json.dumps({ 'op': OP.SUBSCRIBED, 'channels': subscribed, })) elif op == OP.UNSUBSCRIBE: unsub = app.manager.unsubscribe(chans, ws) await ws.send(json.dumps({ 'op': OP.UNSUBSCRIBED, 'channels': unsub, })) @bp.websocket('/api/streaming') async def streaming_ws(request, ws): hewwo_msg = await recv_op(ws, OP.SUBSCRIBE) # subscribe the websocket to all channels it wants ws.client_id = uuid.uuid4() channels = hewwo_msg['channels'] subscribed = request.app.manager.subscribe(channels, ws) try: await ws.send(json.dumps({ 'op': OP.SUBSCRIBED, 'channels': subscribed, })) request.app.loop.create_task(_recv_payload_loop(ws, request.app)) # keep websocket alive while True: await ws.ping() await asyncio.sleep(1) except websockets.exceptions.ConnectionClosed as wc: log.warning(f'connection {ws.client_id} closed: {wc!r}') request.app.manager.unsub_all(ws)