From 3ec64d5bddd0d06455de6f8eea365de7795f1b65 Mon Sep 17 00:00:00 2001 From: Luna Mendes Date: Wed, 11 Jul 2018 03:42:31 -0300 Subject: [PATCH] worker: add basic _dispatch_work logic --- blueprints/streaming.py | 1 + elstat/manager.py | 17 +++++++++++------ elstat/worker.py | 12 ++++++++++++ 3 files changed, 24 insertions(+), 6 deletions(-) diff --git a/blueprints/streaming.py b/blueprints/streaming.py index 87bcbb5..5bdd25d 100644 --- a/blueprints/streaming.py +++ b/blueprints/streaming.py @@ -13,6 +13,7 @@ log = logging.getLogger(__name__) class OP: SUBSCRIBED = 1 + class ErrorCodes: TOO_MUCH = 4420 diff --git a/elstat/manager.py b/elstat/manager.py index 4b95260..7b85042 100644 --- a/elstat/manager.py +++ b/elstat/manager.py @@ -1,6 +1,6 @@ import logging -from typing import List +from typing import List, Dict, Any from .consts import ADAPTERS from .worker import ServiceWorker @@ -40,9 +40,8 @@ class ServiceManager: ); """) - def _check(self, columns: tuple, field: str, chan_name: str = None): - if chan_name is None: - chan_name = field + def _check(self, columns: tuple, field: str, worker_name: str): + chan_name = f'{field}:{worker_name}' if field in columns and chan_name not in self.subscribers: self.subscribers[chan_name] = [] @@ -51,8 +50,8 @@ class ServiceManager: def _create_channels(self, worker): columns = worker.adapter.spec['db'] - self._check(columns, 'status') - self._check(columns, 'latency', f'latency:{worker.name}') + self._check(columns, 'status', worker.name) + self._check(columns, 'latency', worker.name) def _start(self): for name, service in self.cfg.SERVICES.items(): @@ -95,3 +94,9 @@ class ServiceManager: log.info(f'unsubscribed {websocket.client_id} from {unsub}') return unsub + + def publish(self, channel: str, data: Any): + pass + + def publish_many(self, data: Dict[str, Any]): + pass diff --git a/elstat/worker.py b/elstat/worker.py index b433414..5f57566 100644 --- a/elstat/worker.py +++ b/elstat/worker.py @@ -36,6 +36,18 @@ class ServiceWorker: conn.execute(query, (timestamp,) + result) conn.commit() + await self._dispatch_work(columns, result) + + async def _dispatch_work(self, columns, result: tuple): + prechan = columns[1:] + chans = [f'{chan}:{self.name}' for chan in prechan] + + if len(chans) > 1: + self.manager.publish_many({ + chan: result[idx] for idx, chan in enumerate(chans)}) + else: + self.manager.publish(chans[0], result) + async def _work_loop(self): try: while True: