worker: add basic _dispatch_work logic

This commit is contained in:
Luna Mendes 2018-07-11 03:42:31 -03:00
parent 7e6d9d5402
commit 3ec64d5bdd
No known key found for this signature in database
GPG Key ID: 7D950EEE259CE92F
3 changed files with 24 additions and 6 deletions

View File

@ -13,6 +13,7 @@ log = logging.getLogger(__name__)
class OP: class OP:
SUBSCRIBED = 1 SUBSCRIBED = 1
class ErrorCodes: class ErrorCodes:
TOO_MUCH = 4420 TOO_MUCH = 4420

View File

@ -1,6 +1,6 @@
import logging import logging
from typing import List from typing import List, Dict, Any
from .consts import ADAPTERS from .consts import ADAPTERS
from .worker import ServiceWorker from .worker import ServiceWorker
@ -40,9 +40,8 @@ class ServiceManager:
); );
""") """)
def _check(self, columns: tuple, field: str, chan_name: str = None): def _check(self, columns: tuple, field: str, worker_name: str):
if chan_name is None: chan_name = f'{field}:{worker_name}'
chan_name = field
if field in columns and chan_name not in self.subscribers: if field in columns and chan_name not in self.subscribers:
self.subscribers[chan_name] = [] self.subscribers[chan_name] = []
@ -51,8 +50,8 @@ class ServiceManager:
def _create_channels(self, worker): def _create_channels(self, worker):
columns = worker.adapter.spec['db'] columns = worker.adapter.spec['db']
self._check(columns, 'status') self._check(columns, 'status', worker.name)
self._check(columns, 'latency', f'latency:{worker.name}') self._check(columns, 'latency', worker.name)
def _start(self): def _start(self):
for name, service in self.cfg.SERVICES.items(): for name, service in self.cfg.SERVICES.items():
@ -95,3 +94,9 @@ class ServiceManager:
log.info(f'unsubscribed {websocket.client_id} from {unsub}') log.info(f'unsubscribed {websocket.client_id} from {unsub}')
return unsub return unsub
def publish(self, channel: str, data: Any):
pass
def publish_many(self, data: Dict[str, Any]):
pass

View File

@ -36,6 +36,18 @@ class ServiceWorker:
conn.execute(query, (timestamp,) + result) conn.execute(query, (timestamp,) + result)
conn.commit() conn.commit()
await self._dispatch_work(columns, result)
async def _dispatch_work(self, columns, result: tuple):
prechan = columns[1:]
chans = [f'{chan}:{self.name}' for chan in prechan]
if len(chans) > 1:
self.manager.publish_many({
chan: result[idx] for idx, chan in enumerate(chans)})
else:
self.manager.publish(chans[0], result)
async def _work_loop(self): async def _work_loop(self):
try: try:
while True: while True: