blueprints: add streaming blueprint
clients can connect and subscribe to different channels and they will get messages from those channels (basic pub/sub) this is useful in the case of a client wanting issue updates in real time, which is better than the client having to poll /api/status every now and then. - manager: add ServiceManager.subscribe and .unsub_all
This commit is contained in:
parent
1de3bbdd95
commit
053762f69f
5 changed files with 82 additions and 1 deletions
|
@ -1 +1,2 @@
|
|||
from .api import bp as api
|
||||
from .streaming import bp as streaming
|
||||
|
|
|
@ -2,6 +2,7 @@ from sanic import Blueprint, response
|
|||
|
||||
bp = Blueprint(__name__)
|
||||
|
||||
|
||||
def get_status(manager):
|
||||
res = {}
|
||||
|
||||
|
|
48
blueprints/streaming.py
Normal file
48
blueprints/streaming.py
Normal file
|
@ -0,0 +1,48 @@
|
|||
import json
|
||||
import uuid
|
||||
import logging
|
||||
import asyncio
|
||||
|
||||
import websockets
|
||||
from sanic import Blueprint
|
||||
|
||||
bp = Blueprint(__name__)
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class OP:
|
||||
SUBSCRIBED = 1
|
||||
|
||||
class ErrorCodes:
|
||||
TOO_MUCH = 4420
|
||||
|
||||
|
||||
@bp.websocket('/api/streaming')
|
||||
async def streaming_ws(request, ws):
|
||||
opening_msg = await ws.recv()
|
||||
|
||||
if len(opening_msg) > 256:
|
||||
await ws.close(code=ErrorCodes.TOO_MUCH, reason='Too much data')
|
||||
return
|
||||
|
||||
open_payload = json.loads(opening_msg)
|
||||
|
||||
# subscribe the websocket to all channels it wants
|
||||
ws.client_id = uuid.uuid4()
|
||||
|
||||
try:
|
||||
channels = open_payload['channels']
|
||||
subscribed = request.app.manager.subscribe(channels, ws)
|
||||
|
||||
await ws.send(json.dumps({
|
||||
'op': OP.SUBSCRIBED,
|
||||
'channels': subscribed,
|
||||
}))
|
||||
|
||||
# keep websocket alive
|
||||
while True:
|
||||
await ws.ping()
|
||||
await asyncio.sleep(1)
|
||||
except websockets.exceptions.ConnectionClosed as wc:
|
||||
log.warning(f'connection {ws.client_id} closed: {wc!r}')
|
||||
request.app.manager.unsub_all(ws)
|
|
@ -1,5 +1,7 @@
|
|||
import logging
|
||||
|
||||
from typing import List
|
||||
|
||||
from .consts import ADAPTERS
|
||||
from .worker import ServiceWorker
|
||||
|
||||
|
@ -21,6 +23,7 @@ class ServiceManager:
|
|||
|
||||
self.workers = {}
|
||||
self.state = {}
|
||||
self.subscribers = {}
|
||||
|
||||
self._start()
|
||||
|
||||
|
@ -46,3 +49,30 @@ class ServiceManager:
|
|||
self.workers[name] = serv_worker
|
||||
|
||||
self.state[name] = None
|
||||
|
||||
def subscribe(self, channels: List[str], websocket):
|
||||
"""Subscribe to a list of channels."""
|
||||
subscribed = []
|
||||
|
||||
for chan in channels:
|
||||
try:
|
||||
self.subscribers[chan].append(websocket.client_id)
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
return subscribed
|
||||
|
||||
def unsub_all(self, websocket):
|
||||
"""Unsubscribe a websocket from all known channels."""
|
||||
unsub = []
|
||||
|
||||
for chan, subs in self.subscribers.items():
|
||||
log.info(f'Unsubscribing {websocket.client_id} from {chan}')
|
||||
try:
|
||||
subs.remove(websocket.client_id)
|
||||
unsub.append(chan)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
log.info(f'unsubscribed {websocket.client_id} from {unsub}')
|
||||
return unsub
|
||||
|
|
3
run.py
3
run.py
|
@ -8,7 +8,7 @@ from sanic.exceptions import NotFound, FileNotFound
|
|||
|
||||
import config
|
||||
from elstat import manager
|
||||
from blueprints import api
|
||||
from blueprints import api, streaming
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
log = logging.getLogger(__name__)
|
||||
|
@ -18,6 +18,7 @@ app.cfg = config
|
|||
CORS(app, automatic_options=True)
|
||||
|
||||
app.blueprint(api)
|
||||
app.blueprint(streaming)
|
||||
|
||||
|
||||
@app.listener('before_server_start')
|
||||
|
|
Loading…
Reference in a new issue