Add streaming API documentation

- streaming: add sub/unsub OP codes, changing from the old behavior of
    just giving a channels array.
 - streaming: properly check payloads against their data and send
    proper errors.

 - manager: add unsubscribe()
This commit is contained in:
Luna Mendes 2018-07-11 18:23:47 -03:00
parent 3fa27f5de6
commit 31a0f2b989
No known key found for this signature in database
GPG key ID: 7D950EEE259CE92F
3 changed files with 163 additions and 11 deletions

82
docs/streaming.md Normal file
View file

@ -0,0 +1,82 @@
# Streaming API
Elstat's Streaming API enables clients to receive realtime
updates from the server.
Clients can subscribe just to *certain* channels in the case
they don't want to have a lot of overhead, or they can just
not use the Streaming API at all. It is optional.
## Common payload format
Payloads can not exceed 256 bytes in size.
Payloads are JSON encoded.
```javascript
{
"op": ANY_INTEGER
}
```
Extra fields are on each OP's description, instead of being in
some `d` field (like Discord's Gateway API).
## Error codes
| Error code | Name |
|---:|:---|
|4200|`INVALID_PAYLOAD`|
|4420|`TOO_MUCH`|
## OP Codes
| OP Int | OP Name | Sent/Received by client |
|--:|:--|:--:|
|-1|`UNSUBSCRIBE`|Sent|
|0|`SUBSCRIBE`|Sent|
|1|`SUBSCRIBED`|Received|
|2|`UNSUBSCRIBED`|Received|
|3|`DATA`|Received|
## OP Payloads
- `SUBSCRIBE`
- field: `channels` (`list[str]`), array of channels you want to subscribe to.
- reply: `SUBSCRIBED` payload.
- `UNSUBSCRIBE`
- field: `channels` (`list[str]`), array of channels you want to **un**subscribe from.
- reply: `UNSUBSCRIBED` payload.
- `SUBSCRIBED`
- field: `channels` (`list[str]`), array of channels you *succesfully* subscribed to.
- `UNSUBSCRIBED`
- field: `channels` (`list[str]`), array of channels you *succesfully* **un**subscribed from.
- `DATA`
- field: `c` (`str`), the channel the data is coming from.
- data: `d` (`list[any]`), the data coming in from the channel.
- The first element of `d` is an integer, encoding an UNIX timestamp with millisecond precision.
- The second element of `d` is described on *`Channel types`*
## Channel names
Channels are what clients subscribe to receive data about that channel.
Channels are specified as `<type>:<name>`
e.g `status:elixire` and `graph:elixire`
### Channel types
- `status` channel
- Returns a boolean, representing the status of the service
- `latency` channel
- Returns an integer, representing the latency of the service, in *milliseconds*.
## Connection logic
- Connect a websocket to `/api/streaming`.
- Send a `SUBSCRIBE` payload, receive `SUBSCRIBED` back
- Listen to `DATA` payloads and update local state as needed.

View file

@ -11,36 +11,92 @@ log = logging.getLogger(__name__)
class OP:
UNSUBSCRIBE = -1
SUBSCRIBE = 0
SUBSCRIBED = 1
DATA = 2
UNSUBSCRIBED = 2
DATA = 3
class ErrorCodes:
INVALID_PAYLOAD = 4200
TOO_MUCH = 4420
@bp.websocket('/api/streaming')
async def streaming_ws(request, ws):
async def recv_msg(ws):
opening_msg = await ws.recv()
if len(opening_msg) > 256:
await ws.close(code=ErrorCodes.TOO_MUCH, reason='Too much data')
return
open_payload = json.loads(opening_msg)
try:
return json.loads(opening_msg)
except json.decoder.JSONDecodeError:
await ws.close(code=ErrorCodes.INVALID_PAYLOAD,
reason='invalid json')
return
async def recv_op(ws, op: int):
payload = await recv_msg(ws)
if not payload:
return
if payload.get('op') != op:
await ws.close(code=ErrorCodes.INVALID_PAYLOAD,
reason='Invalid OP code')
return
return payload
async def _recv_payload_loop(ws, app):
while True:
try:
payload = await recv_msg(ws)
except websockets.exceptions.ConnectionClosed:
break
if not payload:
continue
op = payload.get('op')
chans = payload.get('channels')
if op == OP.SUBSCRIBE:
subscribed = app.manager.subscribe(chans, ws)
await ws.send(json.dumps({
'op': OP.SUBSCRIBED,
'channels': subscribed,
}))
elif op == OP.UNSUBSCRIBE:
unsub = app.manager.unsubscribe(chans, ws)
await ws.send(json.dumps({
'op': OP.UNSUBSCRIBED,
'channels': unsub,
}))
@bp.websocket('/api/streaming')
async def streaming_ws(request, ws):
hewwo_msg = await recv_op(ws, OP.SUBSCRIBE)
# subscribe the websocket to all channels it wants
ws.client_id = uuid.uuid4()
try:
channels = open_payload['channels']
subscribed = request.app.manager.subscribe(channels, ws)
channels = hewwo_msg['channels']
subscribed = request.app.manager.subscribe(channels, ws)
try:
await ws.send(json.dumps({
'op': OP.SUBSCRIBED,
'channels': subscribed,
}))
request.app.loop.create_task(_recv_payload_loop(ws, request.app))
# keep websocket alive
while True:
await ws.ping()

View file

@ -72,28 +72,42 @@ class ServiceManager:
for worker in self.workers.values():
worker.stop()
def subscribe(self, channels: List[str], websocket):
def subscribe(self, channels: List[str], websocket) -> List[str]:
"""Subscribe to a list of channels."""
wid = websocket.client_id
subscribed = []
self._websockets[websocket.client_id] = websocket
for chan in channels:
try:
self.subscribers[chan].append(websocket.client_id)
self.subscribers[chan].append(wid)
subscribed.append(chan)
log.info(f'Subscribed {websocket.client_id} to {chan}')
log.info(f'Subscribed {wid} to {chan}')
except KeyError:
pass
return subscribed
def unsubscribe(self, channels: List[str], websocket) -> List[str]:
wid = websocket.client_id
unsub = []
for chan in channels:
try:
self.subscribers[chan].remove(wid)
unsub.append(chan)
log.info(f'Unsubscribed {wid} from {chan}')
except (KeyError, ValueError):
pass
return unsub
def unsub_all(self, websocket):
"""Unsubscribe a websocket from all known channels."""
unsub = []
for chan, subs in self.subscribers.items():
log.info(f'Unsubscribing {websocket.client_id} from {chan}')
try:
subs.remove(websocket.client_id)
unsub.append(chan)