import NanoEvents from 'nanoevents' import OP from './op' import { logger } from '../util' const log = logger({ name: 'ws', color: 'green' }) export default class StreamingClient extends NanoEvents { constructor (url, metrics) { super() this.url = url this.metrics = metrics this.ws = null this.delay = 1000 } send (packet) { this.ws.send(JSON.stringify(packet)) } subscribe () { if (!this.metrics) { log('not subscribing to channels -- no initial metrics') return } log('subscribing to channels') const { status } = this.metrics const channels = Object.keys(status).reduce( (channels, name) => [...channels, `latency:${name}`, `status:${name}`], [] ) log(`subscribing to ${channels.length} channels: ${channels.join(', ')}`) this.send({ op: OP.SUBSCRIBE, channels }) } handle (packet) { const { op, c: channel, d: data } = packet if (op !== OP.DATA) { return } const [type, name] = channel.split(':') this.emit(type, name, data) } connect () { const begin = Date.now() log('connecting') const ws = this.ws = new WebSocket(this.url) window.ws = ws ws.onopen = () => { const took = Date.now() - begin log(`connected (took ${took}ms)`) this.subscribe() } ws.onclose = ({ code, reason }) => { log( `ws closed with code ${code} (reason: ${reason || ''}); ` + `attempting to reconnect in ${this.delay}ms` ) setTimeout(() => this.connect(), this.delay) this.delay *= 2 } ws.onmessage = ({ data }) => { const packet = JSON.parse(data) log('recv:', packet) this.emit('packet', packet) this.handle(packet) } ws.onerror = (err) => { log('error:', err) } } }