diff --git a/docs/streaming.md b/docs/streaming.md new file mode 100644 index 0000000..be8e411 --- /dev/null +++ b/docs/streaming.md @@ -0,0 +1,82 @@ +# Streaming API + +Elstat's Streaming API enables clients to receive realtime +updates from the server. + +Clients can subscribe just to *certain* channels in the case +they don't want to have a lot of overhead, or they can just +not use the Streaming API at all. It is optional. + + +## Common payload format + +Payloads can not exceed 256 bytes in size. +Payloads are JSON encoded. + +```javascript +{ + "op": ANY_INTEGER +} +``` + +Extra fields are on each OP's description, instead of being in +some `d` field (like Discord's Gateway API). + + +## Error codes + +| Error code | Name | +|---:|:---| +|4200|`INVALID_PAYLOAD`| +|4420|`TOO_MUCH`| + + +## OP Codes + +| OP Int | OP Name | Sent/Received by client | +|--:|:--|:--:| +|-1|`UNSUBSCRIBE`|Sent| +|0|`SUBSCRIBE`|Sent| +|1|`SUBSCRIBED`|Received| +|2|`UNSUBSCRIBED`|Received| +|3|`DATA`|Received| + + +## OP Payloads + + - `SUBSCRIBE` + - field: `channels` (`list[str]`), array of channels you want to subscribe to. + - reply: `SUBSCRIBED` payload. + - `UNSUBSCRIBE` + - field: `channels` (`list[str]`), array of channels you want to **un**subscribe from. + - reply: `UNSUBSCRIBED` payload. + - `SUBSCRIBED` + - field: `channels` (`list[str]`), array of channels you *succesfully* subscribed to. + - `UNSUBSCRIBED` + - field: `channels` (`list[str]`), array of channels you *succesfully* **un**subscribed from. + - `DATA` + - field: `c` (`str`), the channel the data is coming from. + - data: `d` (`list[any]`), the data coming in from the channel. + - The first element of `d` is an integer, encoding an UNIX timestamp with millisecond precision. + - The second element of `d` is described on *`Channel types`* + + +## Channel names + +Channels are what clients subscribe to receive data about that channel. +Channels are specified as `:` + +e.g `status:elixire` and `graph:elixire` + +### Channel types + - `status` channel + - Returns a boolean, representing the status of the service + - `latency` channel + - Returns an integer, representing the latency of the service, in *milliseconds*. + + +## Connection logic + + - Connect a websocket to `/api/streaming`. + - Send a `SUBSCRIBE` payload, receive `SUBSCRIBED` back + - Listen to `DATA` payloads and update local state as needed. diff --git a/elstat/blueprints/streaming.py b/elstat/blueprints/streaming.py index c7af518..105a52d 100644 --- a/elstat/blueprints/streaming.py +++ b/elstat/blueprints/streaming.py @@ -11,36 +11,92 @@ log = logging.getLogger(__name__) class OP: + UNSUBSCRIBE = -1 + SUBSCRIBE = 0 SUBSCRIBED = 1 - DATA = 2 + UNSUBSCRIBED = 2 + DATA = 3 class ErrorCodes: + INVALID_PAYLOAD = 4200 TOO_MUCH = 4420 -@bp.websocket('/api/streaming') -async def streaming_ws(request, ws): +async def recv_msg(ws): opening_msg = await ws.recv() if len(opening_msg) > 256: await ws.close(code=ErrorCodes.TOO_MUCH, reason='Too much data') return - open_payload = json.loads(opening_msg) + try: + return json.loads(opening_msg) + except json.decoder.JSONDecodeError: + await ws.close(code=ErrorCodes.INVALID_PAYLOAD, + reason='invalid json') + return + + +async def recv_op(ws, op: int): + payload = await recv_msg(ws) + + if not payload: + return + + if payload.get('op') != op: + await ws.close(code=ErrorCodes.INVALID_PAYLOAD, + reason='Invalid OP code') + return + + return payload + + +async def _recv_payload_loop(ws, app): + while True: + try: + payload = await recv_msg(ws) + except websockets.exceptions.ConnectionClosed: + break + + if not payload: + continue + + op = payload.get('op') + chans = payload.get('channels') + + if op == OP.SUBSCRIBE: + subscribed = app.manager.subscribe(chans, ws) + await ws.send(json.dumps({ + 'op': OP.SUBSCRIBED, + 'channels': subscribed, + })) + elif op == OP.UNSUBSCRIBE: + unsub = app.manager.unsubscribe(chans, ws) + await ws.send(json.dumps({ + 'op': OP.UNSUBSCRIBED, + 'channels': unsub, + })) + + +@bp.websocket('/api/streaming') +async def streaming_ws(request, ws): + hewwo_msg = await recv_op(ws, OP.SUBSCRIBE) # subscribe the websocket to all channels it wants ws.client_id = uuid.uuid4() - try: - channels = open_payload['channels'] - subscribed = request.app.manager.subscribe(channels, ws) + channels = hewwo_msg['channels'] + subscribed = request.app.manager.subscribe(channels, ws) + try: await ws.send(json.dumps({ 'op': OP.SUBSCRIBED, 'channels': subscribed, })) + request.app.loop.create_task(_recv_payload_loop(ws, request.app)) + # keep websocket alive while True: await ws.ping() diff --git a/elstat/manager.py b/elstat/manager.py index ffbd48a..b48ca39 100644 --- a/elstat/manager.py +++ b/elstat/manager.py @@ -72,28 +72,42 @@ class ServiceManager: for worker in self.workers.values(): worker.stop() - def subscribe(self, channels: List[str], websocket): + def subscribe(self, channels: List[str], websocket) -> List[str]: """Subscribe to a list of channels.""" + wid = websocket.client_id subscribed = [] self._websockets[websocket.client_id] = websocket for chan in channels: try: - self.subscribers[chan].append(websocket.client_id) + self.subscribers[chan].append(wid) subscribed.append(chan) - log.info(f'Subscribed {websocket.client_id} to {chan}') + log.info(f'Subscribed {wid} to {chan}') except KeyError: pass return subscribed + def unsubscribe(self, channels: List[str], websocket) -> List[str]: + wid = websocket.client_id + unsub = [] + + for chan in channels: + try: + self.subscribers[chan].remove(wid) + unsub.append(chan) + log.info(f'Unsubscribed {wid} from {chan}') + except (KeyError, ValueError): + pass + + return unsub + def unsub_all(self, websocket): """Unsubscribe a websocket from all known channels.""" unsub = [] for chan, subs in self.subscribers.items(): - log.info(f'Unsubscribing {websocket.client_id} from {chan}') try: subs.remove(websocket.client_id) unsub.append(chan)