move blueprints inside elstat folder

- add OP.DATA
 - manager: add _websockets map
 - manager: add basic publish logic
 - worker: add timestamps to published messages
This commit is contained in:
Luna Mendes 2018-07-11 17:00:39 -03:00
parent 3ec64d5bdd
commit 3fa27f5de6
No known key found for this signature in database
GPG key ID: 7D950EEE259CE92F
7 changed files with 39 additions and 13 deletions

0
__init__.py Normal file
View file

View file

@ -12,6 +12,7 @@ log = logging.getLogger(__name__)
class OP: class OP:
SUBSCRIBED = 1 SUBSCRIBED = 1
DATA = 2
class ErrorCodes: class ErrorCodes:

View file

@ -1,10 +1,13 @@
import logging import logging
import json
from typing import List, Dict, Any from typing import List, Dict, Any
from .consts import ADAPTERS from .consts import ADAPTERS
from .worker import ServiceWorker from .worker import ServiceWorker
from .blueprints.streaming import OP
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
_COLUMNS = { _COLUMNS = {
@ -24,6 +27,7 @@ class ServiceManager:
self.workers = {} self.workers = {}
self.state = {} self.state = {}
self.subscribers = {} self.subscribers = {}
self._websockets = {}
self._start() self._start()
@ -72,9 +76,13 @@ class ServiceManager:
"""Subscribe to a list of channels.""" """Subscribe to a list of channels."""
subscribed = [] subscribed = []
self._websockets[websocket.client_id] = websocket
for chan in channels: for chan in channels:
try: try:
self.subscribers[chan].append(websocket.client_id) self.subscribers[chan].append(websocket.client_id)
subscribed.append(chan)
log.info(f'Subscribed {websocket.client_id} to {chan}')
except KeyError: except KeyError:
pass pass
@ -93,10 +101,30 @@ class ServiceManager:
pass pass
log.info(f'unsubscribed {websocket.client_id} from {unsub}') log.info(f'unsubscribed {websocket.client_id} from {unsub}')
try:
self._websockets.pop(websocket.client_id)
except KeyError:
pass
return unsub return unsub
def publish(self, channel: str, data: Any): def _raw_send(self, websocket, channel: str, data: Any):
pass if websocket is None:
return
def publish_many(self, data: Dict[str, Any]): loop = self.app.loop
pass
return loop.create_task(websocket.send(json.dumps({
'op': OP.DATA,
'c': channel,
'd': data,
})))
def publish(self, channel: str, data: Any):
ws_ids = self.subscribers[channel]
websockets = map(self._websockets.get, ws_ids)
def _send(websocket):
return self._raw_send(websocket, channel, data)
tasks = map(_send, websockets)
return list(tasks)

View file

@ -33,20 +33,17 @@ class ServiceWorker:
VALUES ({args_str}) VALUES ({args_str})
""" """
conn.execute(query, (timestamp,) + result) conn.execute(query, (timestamp, ) + result)
conn.commit() conn.commit()
await self._dispatch_work(columns, result) await self._dispatch_work(columns, timestamp, result)
async def _dispatch_work(self, columns, result: tuple): async def _dispatch_work(self, columns, timestamp: int, result: tuple):
prechan = columns[1:] prechan = columns[1:]
chans = [f'{chan}:{self.name}' for chan in prechan] chans = [f'{chan}:{self.name}' for chan in prechan]
if len(chans) > 1: for idx, chan in enumerate(chans):
self.manager.publish_many({ self.manager.publish(chan, (timestamp, result[idx]))
chan: result[idx] for idx, chan in enumerate(chans)})
else:
self.manager.publish(chans[0], result)
async def _work_loop(self): async def _work_loop(self):
try: try:

2
run.py
View file

@ -8,7 +8,7 @@ from sanic.exceptions import NotFound, FileNotFound
import config import config
from elstat import manager from elstat import manager
from blueprints import api, streaming from elstat.blueprints import api, streaming
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__) log = logging.getLogger(__name__)