From df3f5e170b7b972bbdf96fb099e838cfd69402b1 Mon Sep 17 00:00:00 2001 From: slice Date: Wed, 18 Jul 2018 12:21:44 -0700 Subject: [PATCH] decouple streaming client from app comp --- priv/frontend/package-lock.json | 5 ++ priv/frontend/package.json | 1 + priv/frontend/src/components/App.js | 98 +++-------------------------- priv/frontend/src/util.js | 11 ++-- priv/frontend/src/ws/client.js | 84 +++++++++++++++++++++++++ 5 files changed, 105 insertions(+), 94 deletions(-) create mode 100644 priv/frontend/src/ws/client.js diff --git a/priv/frontend/package-lock.json b/priv/frontend/package-lock.json index 9fb3637..f4073ed 100644 --- a/priv/frontend/package-lock.json +++ b/priv/frontend/package-lock.json @@ -6926,6 +6926,11 @@ "integrity": "sha512-bAdJv7fBLhWC+/Bls0Oza+mvTaNQtP+1RyhhhvD95pgUJz6XM5IzgmxOkItJ9tkoCiplvAnXI1tNmmUD/eScyA==", "optional": true }, + "nanoevents": { + "version": "1.0.5", + "resolved": "https://registry.npmjs.org/nanoevents/-/nanoevents-1.0.5.tgz", + "integrity": "sha512-e6wsmRiGoBKaHHavsaKeVZRfxzH6y7evHdbZMTeRPyA0JKChr32quvabb6qPH6zDfEwy2/v/RABRdz8MuHXLJw==" + }, "nanomatch": { "version": "1.2.13", "resolved": "https://registry.npmjs.org/nanomatch/-/nanomatch-1.2.13.tgz", diff --git a/priv/frontend/package.json b/priv/frontend/package.json index dc6005f..c40c68a 100644 --- a/priv/frontend/package.json +++ b/priv/frontend/package.json @@ -8,6 +8,7 @@ "@fortawesome/free-solid-svg-icons": "^5.1.0-11", "@fortawesome/react-fontawesome": "0.1.0-11", "ms": "^2.1.1", + "nanoevents": "^1.0.5", "prop-types": "^15.6.2", "react": "^16.4.0", "react-dom": "^16.4.0", diff --git a/priv/frontend/src/components/App.js b/priv/frontend/src/components/App.js index f07231c..229085f 100644 --- a/priv/frontend/src/components/App.js +++ b/priv/frontend/src/components/App.js @@ -4,14 +4,12 @@ import './App.css' import Service from './Service.js' import ServicePlaceholder from './ServicePlaceholder.js' import DegradedNotice from './DegradedNotice.js' -import OP from '../ws/op.js' +import StreamingClient from '../ws/client' import { log, objectFromEntries } from '../util.js' import { domain as DOMAIN } from '../config.json' export default class App extends Component { - websocket = null - - reconnectionTime = 1000 + client = null state = { loading: true, @@ -21,31 +19,15 @@ export default class App extends Component { async componentDidMount () { await this.loadMetrics() - this.connect() - } - subscribeToChannels () { - if (this.state.metrics == null) { - // fetching state failed, bail - log('not going to subscribe to realtime updates due to fetch failure') - return - } + const endpoint = `${DOMAIN}/api/streaming` + .replace('https', 'wss') + .replace('http', 'ws') + this.client = new StreamingClient(endpoint, this.state.metrics) + this.client.connect() - const { graph } = this.state.metrics - - const channels = Object.keys(graph).reduce( - (channels, name) => [...channels, `latency:${name}`, `status:${name}`], - [] - ) - - log( - `subscribing to ${channels.length} channels: ${channels.join(', ')}` - ) - - this._send({ - op: OP.SUBSCRIBE, - channels, - }) + this.client.on('status', this.handleStatus.bind(this)) + this.client.on('latency', this.handleLatency.bind(this)) } handleStatus (name, [, status]) { @@ -92,68 +74,6 @@ export default class App extends Component { }) } - handlePacket (packet) { - const { op, c: channel, d: data } = packet - - if (op !== OP.DATA) { - return - } - - const [type, name] = channel.split(':') - - log('updating from channel:', channel) - - if (type === 'latency') { - this.handleLatency(name, data) - } else if (type === 'status') { - this.handleStatus(name, data) - } - } - - connect () { - log('connecting to ws') - - const endpoint = `${DOMAIN}/api/streaming` - .replace('https', 'wss') - .replace('http', 'ws') - - this.websocket = new WebSocket(endpoint) - - this.websocket.onopen = () => { - log('ws opened') - this.subscribeToChannels() - } - - this.websocket.onclose = ({ code, reason }) => { - log(`ws closed with code ${code} (reason: ${reason || ''}); ` + - `attempting to reconnect in ${this.reconnectionTime}ms`) - setTimeout(() => this.connect(), this.reconnectionTime) - this.reconnectionTime *= 2 - } - - this.websocket.onmessage = (message) => { - const { data } = message - const parsed = JSON.parse(data) - - console.log( - '%c>>>%c', - 'color: hsla(320, 100%, 50%, 1); font-weight: bold', - 'color: inherit; font-weight: inherit', - parsed, - ) - - this.handlePacket(parsed) - } - - this.websocket.onerror = (event) => { - log('ws error:', event) - } - } - - _send (payload) { - this.websocket.send(JSON.stringify(payload)) - } - async loadMetrics () { log('loading metrics') diff --git a/priv/frontend/src/util.js b/priv/frontend/src/util.js index fc9bec2..ba96c3e 100644 --- a/priv/frontend/src/util.js +++ b/priv/frontend/src/util.js @@ -1,11 +1,12 @@ -export function log (...args) { +export const logger = ({ name, color }) => (...args) => console.log( - '%c[elstat]%c', - 'color: purple; font-weight: bold', + `%c[${name}]%c`, + `color: ${color}; font-weight: bold`, 'color: inherit; font-weight: inherit', - ...args, + ...args ) -} + +export const log = logger({ name: 'elstat', color: 'purple' }) export function objectFromEntries (entries) { return entries.reduce( diff --git a/priv/frontend/src/ws/client.js b/priv/frontend/src/ws/client.js new file mode 100644 index 0000000..fd773be --- /dev/null +++ b/priv/frontend/src/ws/client.js @@ -0,0 +1,84 @@ +import NanoEvents from 'nanoevents' + +import OP from './op' +import { logger } from '../util' +const log = logger({ name: 'ws', color: 'green' }) + +export default class StreamingClient extends NanoEvents { + constructor (url, metrics) { + super() + + this.url = url + this.metrics = metrics + this.ws = null + this.delay = 1000 + } + + send (packet) { + this.ws.send(JSON.stringify(packet)) + } + + subscribe () { + if (!this.metrics) { + log('not subscribing to channels -- no initial metrics') + return + } + + log('subscribing to channels') + + const { status } = this.metrics + const channels = Object.keys(status).reduce( + (channels, name) => [...channels, `latency:${name}`, `status:${name}`], + [] + ) + + log(`subscribing to ${channels.length} channels: ${channels.join(', ')}`) + this.send({ op: OP.SUBSCRIBE, channels }) + } + + handle (packet) { + const { op, c: channel, d: data } = packet + + if (op !== OP.DATA) { + return + } + + const [type, name] = channel.split(':') + + this.emit(type, name, data) + } + + connect () { + log('connecting') + + const ws = this.ws = new WebSocket(this.url) + window.ws = ws + + ws.onopen = () => { + log('connected') + this.subscribe() + } + + ws.onclose = ({ code, reason }) => { + log( + `ws closed with code ${code} (reason: ${reason || ''}); ` + + `attempting to reconnect in ${this.delay}ms` + ) + + setTimeout(() => this.connect(), this.delay) + this.delay *= 2 + } + + ws.onmessage = ({ data }) => { + log(`recv: ${data}`) + const packet = JSON.parse(data) + + this.emit('packet', packet) + this.handle(packet) + } + + ws.onerror = (err) => { + log(`error: ${err}`) + } + } +}