elstat/priv/frontend/src/ws/client.js

91 lines
2.0 KiB
JavaScript

import NanoEvents from 'nanoevents'
import OP from './op'
import { logger } from '../util'
const log = logger({ name: 'ws', color: 'green' })
export default class StreamingClient extends NanoEvents {
constructor(url, metrics) {
super()
this.url = url
this.metrics = metrics
this.ws = null
this.delay = 1000
}
send(packet) {
this.ws.send(JSON.stringify(packet))
}
subscribe() {
if (!this.metrics) {
log('not subscribing to channels -- no initial metrics')
return
}
log('subscribing to channels')
const { status } = this.metrics
const channels = Object.keys(status).reduce(
(channels, name) => [...channels, `latency:${name}`, `status:${name}`],
['incidents']
)
log(`subscribing to ${channels.length} channels: ${channels.join(', ')}`)
this.send({ op: OP.SUBSCRIBE, channels })
}
handle(packet) {
const { op, c: channel, d: data } = packet
if ([OP.SUBSCRIBED, OP.UNSUBSCRIBED].includes(op)) {
return
}
if (channel === 'incidents') {
const [name] = Object.entries(OP).find(([_name, value]) => value === op)
this.emit(name.toLowerCase(), data)
} else {
const [type, name] = channel.split(':')
this.emit(type, name, data)
}
}
connect() {
const begin = Date.now()
log('connecting')
const ws = (this.ws = new WebSocket(this.url))
window.ws = ws
ws.onopen = () => {
const took = Date.now() - begin
log(`connected (took ${took}ms)`)
this.subscribe()
}
ws.onclose = ({ code, reason }) => {
log(
`ws closed with code ${code} (reason: ${reason || '<none>'}); ` +
`attempting to reconnect in ${this.delay}ms`
)
setTimeout(() => this.connect(), this.delay)
this.delay *= 2
}
ws.onmessage = ({ data }) => {
const packet = JSON.parse(data)
log('recv:', packet)
this.emit('packet', packet)
this.handle(packet)
}
ws.onerror = (err) => {
log('error:', err)
}
}
}