decouple streaming client from app comp

This commit is contained in:
Skip R. 2018-07-18 12:21:44 -07:00
parent cb5ffadcba
commit df3f5e170b
No known key found for this signature in database
GPG key ID: 1508C19D7436A26D
5 changed files with 105 additions and 94 deletions

View file

@ -6926,6 +6926,11 @@
"integrity": "sha512-bAdJv7fBLhWC+/Bls0Oza+mvTaNQtP+1RyhhhvD95pgUJz6XM5IzgmxOkItJ9tkoCiplvAnXI1tNmmUD/eScyA==",
"optional": true
},
"nanoevents": {
"version": "1.0.5",
"resolved": "https://registry.npmjs.org/nanoevents/-/nanoevents-1.0.5.tgz",
"integrity": "sha512-e6wsmRiGoBKaHHavsaKeVZRfxzH6y7evHdbZMTeRPyA0JKChr32quvabb6qPH6zDfEwy2/v/RABRdz8MuHXLJw=="
},
"nanomatch": {
"version": "1.2.13",
"resolved": "https://registry.npmjs.org/nanomatch/-/nanomatch-1.2.13.tgz",

View file

@ -8,6 +8,7 @@
"@fortawesome/free-solid-svg-icons": "^5.1.0-11",
"@fortawesome/react-fontawesome": "0.1.0-11",
"ms": "^2.1.1",
"nanoevents": "^1.0.5",
"prop-types": "^15.6.2",
"react": "^16.4.0",
"react-dom": "^16.4.0",

View file

@ -4,14 +4,12 @@ import './App.css'
import Service from './Service.js'
import ServicePlaceholder from './ServicePlaceholder.js'
import DegradedNotice from './DegradedNotice.js'
import OP from '../ws/op.js'
import StreamingClient from '../ws/client'
import { log, objectFromEntries } from '../util.js'
import { domain as DOMAIN } from '../config.json'
export default class App extends Component {
websocket = null
reconnectionTime = 1000
client = null
state = {
loading: true,
@ -21,31 +19,15 @@ export default class App extends Component {
async componentDidMount () {
await this.loadMetrics()
this.connect()
}
subscribeToChannels () {
if (this.state.metrics == null) {
// fetching state failed, bail
log('not going to subscribe to realtime updates due to fetch failure')
return
}
const endpoint = `${DOMAIN}/api/streaming`
.replace('https', 'wss')
.replace('http', 'ws')
this.client = new StreamingClient(endpoint, this.state.metrics)
this.client.connect()
const { graph } = this.state.metrics
const channels = Object.keys(graph).reduce(
(channels, name) => [...channels, `latency:${name}`, `status:${name}`],
[]
)
log(
`subscribing to ${channels.length} channels: ${channels.join(', ')}`
)
this._send({
op: OP.SUBSCRIBE,
channels,
})
this.client.on('status', this.handleStatus.bind(this))
this.client.on('latency', this.handleLatency.bind(this))
}
handleStatus (name, [, status]) {
@ -92,68 +74,6 @@ export default class App extends Component {
})
}
handlePacket (packet) {
const { op, c: channel, d: data } = packet
if (op !== OP.DATA) {
return
}
const [type, name] = channel.split(':')
log('updating from channel:', channel)
if (type === 'latency') {
this.handleLatency(name, data)
} else if (type === 'status') {
this.handleStatus(name, data)
}
}
connect () {
log('connecting to ws')
const endpoint = `${DOMAIN}/api/streaming`
.replace('https', 'wss')
.replace('http', 'ws')
this.websocket = new WebSocket(endpoint)
this.websocket.onopen = () => {
log('ws opened')
this.subscribeToChannels()
}
this.websocket.onclose = ({ code, reason }) => {
log(`ws closed with code ${code} (reason: ${reason || '<none>'}); ` +
`attempting to reconnect in ${this.reconnectionTime}ms`)
setTimeout(() => this.connect(), this.reconnectionTime)
this.reconnectionTime *= 2
}
this.websocket.onmessage = (message) => {
const { data } = message
const parsed = JSON.parse(data)
console.log(
'%c>>>%c',
'color: hsla(320, 100%, 50%, 1); font-weight: bold',
'color: inherit; font-weight: inherit',
parsed,
)
this.handlePacket(parsed)
}
this.websocket.onerror = (event) => {
log('ws error:', event)
}
}
_send (payload) {
this.websocket.send(JSON.stringify(payload))
}
async loadMetrics () {
log('loading metrics')

View file

@ -1,11 +1,12 @@
export function log (...args) {
export const logger = ({ name, color }) => (...args) =>
console.log(
'%c[elstat]%c',
'color: purple; font-weight: bold',
`%c[${name}]%c`,
`color: ${color}; font-weight: bold`,
'color: inherit; font-weight: inherit',
...args,
...args
)
}
export const log = logger({ name: 'elstat', color: 'purple' })
export function objectFromEntries (entries) {
return entries.reduce(

View file

@ -0,0 +1,84 @@
import NanoEvents from 'nanoevents'
import OP from './op'
import { logger } from '../util'
const log = logger({ name: 'ws', color: 'green' })
export default class StreamingClient extends NanoEvents {
constructor (url, metrics) {
super()
this.url = url
this.metrics = metrics
this.ws = null
this.delay = 1000
}
send (packet) {
this.ws.send(JSON.stringify(packet))
}
subscribe () {
if (!this.metrics) {
log('not subscribing to channels -- no initial metrics')
return
}
log('subscribing to channels')
const { status } = this.metrics
const channels = Object.keys(status).reduce(
(channels, name) => [...channels, `latency:${name}`, `status:${name}`],
[]
)
log(`subscribing to ${channels.length} channels: ${channels.join(', ')}`)
this.send({ op: OP.SUBSCRIBE, channels })
}
handle (packet) {
const { op, c: channel, d: data } = packet
if (op !== OP.DATA) {
return
}
const [type, name] = channel.split(':')
this.emit(type, name, data)
}
connect () {
log('connecting')
const ws = this.ws = new WebSocket(this.url)
window.ws = ws
ws.onopen = () => {
log('connected')
this.subscribe()
}
ws.onclose = ({ code, reason }) => {
log(
`ws closed with code ${code} (reason: ${reason || '<none>'}); ` +
`attempting to reconnect in ${this.delay}ms`
)
setTimeout(() => this.connect(), this.delay)
this.delay *= 2
}
ws.onmessage = ({ data }) => {
log(`recv: ${data}`)
const packet = JSON.parse(data)
this.emit('packet', packet)
this.handle(packet)
}
ws.onerror = (err) => {
log(`error: ${err}`)
}
}
}