diff --git a/blueprints/__init__.py b/blueprints/__init__.py index 918d21c..07fd0b8 100644 --- a/blueprints/__init__.py +++ b/blueprints/__init__.py @@ -1 +1,2 @@ from .api import bp as api +from .streaming import bp as streaming diff --git a/blueprints/api.py b/blueprints/api.py index 31cdab5..8287c25 100644 --- a/blueprints/api.py +++ b/blueprints/api.py @@ -2,6 +2,7 @@ from sanic import Blueprint, response bp = Blueprint(__name__) + def get_status(manager): res = {} diff --git a/blueprints/streaming.py b/blueprints/streaming.py new file mode 100644 index 0000000..87bcbb5 --- /dev/null +++ b/blueprints/streaming.py @@ -0,0 +1,48 @@ +import json +import uuid +import logging +import asyncio + +import websockets +from sanic import Blueprint + +bp = Blueprint(__name__) +log = logging.getLogger(__name__) + + +class OP: + SUBSCRIBED = 1 + +class ErrorCodes: + TOO_MUCH = 4420 + + +@bp.websocket('/api/streaming') +async def streaming_ws(request, ws): + opening_msg = await ws.recv() + + if len(opening_msg) > 256: + await ws.close(code=ErrorCodes.TOO_MUCH, reason='Too much data') + return + + open_payload = json.loads(opening_msg) + + # subscribe the websocket to all channels it wants + ws.client_id = uuid.uuid4() + + try: + channels = open_payload['channels'] + subscribed = request.app.manager.subscribe(channels, ws) + + await ws.send(json.dumps({ + 'op': OP.SUBSCRIBED, + 'channels': subscribed, + })) + + # keep websocket alive + while True: + await ws.ping() + await asyncio.sleep(1) + except websockets.exceptions.ConnectionClosed as wc: + log.warning(f'connection {ws.client_id} closed: {wc!r}') + request.app.manager.unsub_all(ws) diff --git a/elstat/manager.py b/elstat/manager.py index 559aba6..19c3353 100644 --- a/elstat/manager.py +++ b/elstat/manager.py @@ -1,5 +1,7 @@ import logging +from typing import List + from .consts import ADAPTERS from .worker import ServiceWorker @@ -21,6 +23,7 @@ class ServiceManager: self.workers = {} self.state = {} + self.subscribers = {} self._start() @@ -46,3 +49,30 @@ class ServiceManager: self.workers[name] = serv_worker self.state[name] = None + + def subscribe(self, channels: List[str], websocket): + """Subscribe to a list of channels.""" + subscribed = [] + + for chan in channels: + try: + self.subscribers[chan].append(websocket.client_id) + except KeyError: + pass + + return subscribed + + def unsub_all(self, websocket): + """Unsubscribe a websocket from all known channels.""" + unsub = [] + + for chan, subs in self.subscribers.items(): + log.info(f'Unsubscribing {websocket.client_id} from {chan}') + try: + subs.remove(websocket.client_id) + unsub.append(chan) + except ValueError: + pass + + log.info(f'unsubscribed {websocket.client_id} from {unsub}') + return unsub diff --git a/run.py b/run.py index 986a361..bc56b93 100644 --- a/run.py +++ b/run.py @@ -8,7 +8,7 @@ from sanic.exceptions import NotFound, FileNotFound import config from elstat import manager -from blueprints import api +from blueprints import api, streaming logging.basicConfig(level=logging.INFO) log = logging.getLogger(__name__) @@ -18,6 +18,7 @@ app.cfg = config CORS(app, automatic_options=True) app.blueprint(api) +app.blueprint(streaming) @app.listener('before_server_start')