From 3fa27f5de6ed63ae9f29418f023f76c1c56c7404 Mon Sep 17 00:00:00 2001 From: Luna Mendes Date: Wed, 11 Jul 2018 17:00:39 -0300 Subject: [PATCH] move blueprints inside elstat folder - add OP.DATA - manager: add _websockets map - manager: add basic publish logic - worker: add timestamps to published messages --- __init__.py | 0 {blueprints => elstat/blueprints}/__init__.py | 0 {blueprints => elstat/blueprints}/api.py | 0 .../blueprints}/streaming.py | 1 + elstat/manager.py | 36 ++++++++++++++++--- elstat/worker.py | 13 +++---- run.py | 2 +- 7 files changed, 39 insertions(+), 13 deletions(-) create mode 100644 __init__.py rename {blueprints => elstat/blueprints}/__init__.py (100%) rename {blueprints => elstat/blueprints}/api.py (100%) rename {blueprints => elstat/blueprints}/streaming.py (98%) diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/blueprints/__init__.py b/elstat/blueprints/__init__.py similarity index 100% rename from blueprints/__init__.py rename to elstat/blueprints/__init__.py diff --git a/blueprints/api.py b/elstat/blueprints/api.py similarity index 100% rename from blueprints/api.py rename to elstat/blueprints/api.py diff --git a/blueprints/streaming.py b/elstat/blueprints/streaming.py similarity index 98% rename from blueprints/streaming.py rename to elstat/blueprints/streaming.py index 5bdd25d..c7af518 100644 --- a/blueprints/streaming.py +++ b/elstat/blueprints/streaming.py @@ -12,6 +12,7 @@ log = logging.getLogger(__name__) class OP: SUBSCRIBED = 1 + DATA = 2 class ErrorCodes: diff --git a/elstat/manager.py b/elstat/manager.py index 7b85042..ffbd48a 100644 --- a/elstat/manager.py +++ b/elstat/manager.py @@ -1,10 +1,13 @@ import logging +import json from typing import List, Dict, Any from .consts import ADAPTERS from .worker import ServiceWorker +from .blueprints.streaming import OP + log = logging.getLogger(__name__) _COLUMNS = { @@ -24,6 +27,7 @@ class ServiceManager: self.workers = {} self.state = {} self.subscribers = {} + self._websockets = {} self._start() @@ -72,9 +76,13 @@ class ServiceManager: """Subscribe to a list of channels.""" subscribed = [] + self._websockets[websocket.client_id] = websocket + for chan in channels: try: self.subscribers[chan].append(websocket.client_id) + subscribed.append(chan) + log.info(f'Subscribed {websocket.client_id} to {chan}') except KeyError: pass @@ -93,10 +101,30 @@ class ServiceManager: pass log.info(f'unsubscribed {websocket.client_id} from {unsub}') + try: + self._websockets.pop(websocket.client_id) + except KeyError: + pass return unsub - def publish(self, channel: str, data: Any): - pass + def _raw_send(self, websocket, channel: str, data: Any): + if websocket is None: + return - def publish_many(self, data: Dict[str, Any]): - pass + loop = self.app.loop + + return loop.create_task(websocket.send(json.dumps({ + 'op': OP.DATA, + 'c': channel, + 'd': data, + }))) + + def publish(self, channel: str, data: Any): + ws_ids = self.subscribers[channel] + websockets = map(self._websockets.get, ws_ids) + + def _send(websocket): + return self._raw_send(websocket, channel, data) + + tasks = map(_send, websockets) + return list(tasks) diff --git a/elstat/worker.py b/elstat/worker.py index 5f57566..c91201c 100644 --- a/elstat/worker.py +++ b/elstat/worker.py @@ -33,20 +33,17 @@ class ServiceWorker: VALUES ({args_str}) """ - conn.execute(query, (timestamp,) + result) + conn.execute(query, (timestamp, ) + result) conn.commit() - await self._dispatch_work(columns, result) + await self._dispatch_work(columns, timestamp, result) - async def _dispatch_work(self, columns, result: tuple): + async def _dispatch_work(self, columns, timestamp: int, result: tuple): prechan = columns[1:] chans = [f'{chan}:{self.name}' for chan in prechan] - if len(chans) > 1: - self.manager.publish_many({ - chan: result[idx] for idx, chan in enumerate(chans)}) - else: - self.manager.publish(chans[0], result) + for idx, chan in enumerate(chans): + self.manager.publish(chan, (timestamp, result[idx])) async def _work_loop(self): try: diff --git a/run.py b/run.py index 8e5e4e1..53e155f 100644 --- a/run.py +++ b/run.py @@ -8,7 +8,7 @@ from sanic.exceptions import NotFound, FileNotFound import config from elstat import manager -from blueprints import api, streaming +from elstat.blueprints import api, streaming logging.basicConfig(level=logging.INFO) log = logging.getLogger(__name__)