feat: sharding (!)

This commit is contained in:
DjDeveloperr 2021-01-24 22:16:10 +05:30
parent 0b995d732d
commit 2dcce6e2bb
7 changed files with 118 additions and 38 deletions

View File

@ -111,7 +111,7 @@ export class Gateway extends HarmonyEventEmitter<GatewayTypedEvents> {
if (!this.initialized) {
this.initialized = true
await this.sendIdentify(this.client.forceNewSession)
this.enqueueIdentify(this.client.forceNewSession)
} else {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.sendResume()
@ -134,14 +134,14 @@ export class Gateway extends HarmonyEventEmitter<GatewayTypedEvents> {
)
if (d !== true) {
this.debug(`Session was invalidated, deleting from cache`)
await this.cache.delete('session_id')
await this.cache.delete('seq')
await this.cache.delete(`session_id_${this.shards?.join('-') ?? '0'}`)
await this.cache.delete(`seq_${this.shards?.join('-') ?? '0'}`)
this.sessionID = undefined
this.sequenceID = undefined
}
this.timedIdentify = setTimeout(async () => {
this.timedIdentify = null
await this.sendIdentify(!(d as boolean))
this.enqueueIdentify(!(d as boolean))
}, 5000)
break
@ -149,7 +149,7 @@ export class Gateway extends HarmonyEventEmitter<GatewayTypedEvents> {
this.heartbeatServerResponded = true
if (s !== null) {
this.sequenceID = s
await this.cache.set('seq', s)
await this.cache.set(`seq_${this.shards?.join('-') ?? '0'}`, s)
}
if (t !== null && t !== undefined) {
this.emit(t as any, d)
@ -167,8 +167,11 @@ export class Gateway extends HarmonyEventEmitter<GatewayTypedEvents> {
// this.token = d.token
this.sessionID = d.session_id
this.sequenceID = d.seq
await this.cache.set('seq', d.seq)
await this.cache.set('session_id', this.sessionID)
await this.cache.set(`seq_${this.shards?.join('-') ?? '0'}`, d.seq)
await this.cache.set(
`session_id_${this.shards?.join('-') ?? '0'}`,
this.sessionID
)
this.emit('resume')
break
}
@ -233,10 +236,12 @@ export class Gateway extends HarmonyEventEmitter<GatewayTypedEvents> {
this.debug(
'Unknown Close code, probably connection error. Reconnecting in 5s.'
)
if (this.timedIdentify !== null) {
clearTimeout(this.timedIdentify)
this.debug('Timed Identify found. Cleared timeout.')
}
await delay(5000)
await this.reconnect(true)
break
@ -255,7 +260,12 @@ export class Gateway extends HarmonyEventEmitter<GatewayTypedEvents> {
error.name = 'ErrorEvent'
console.log(error)
this.emit('error', error, event)
await this.reconnect()
}
private enqueueIdentify(forceNew?: boolean): void {
this.client.shards.enqueueIdentify(
async () => await this.sendIdentify(forceNew)
)
}
private async sendIdentify(forceNewSession?: boolean): Promise<void> {
@ -279,7 +289,9 @@ export class Gateway extends HarmonyEventEmitter<GatewayTypedEvents> {
}
if (forceNewSession === undefined || !forceNewSession) {
const sessionIDCached = await this.cache.get('session_id')
const sessionIDCached = await this.cache.get(
`session_id_${this.shards?.join('-') ?? '0'}`
)
if (sessionIDCached !== undefined) {
this.debug(`Found Cached SessionID: ${sessionIDCached}`)
this.sessionID = sessionIDCached
@ -320,12 +332,16 @@ export class Gateway extends HarmonyEventEmitter<GatewayTypedEvents> {
throw new Error('Intents not specified')
if (this.sessionID === undefined) {
this.sessionID = await this.cache.get('session_id')
if (this.sessionID === undefined) return await this.sendIdentify()
this.sessionID = await this.cache.get(
`session_id_${this.shards?.join('-') ?? '0'}`
)
if (this.sessionID === undefined) return this.enqueueIdentify()
}
this.debug(`Preparing to resume with Session: ${this.sessionID}`)
if (this.sequenceID === undefined) {
const cached = await this.cache.get('seq')
const cached = await this.cache.get(
`seq_${this.shards?.join('-') ?? '0'}`
)
if (cached !== undefined)
this.sequenceID = typeof cached === 'string' ? parseInt(cached) : cached
}
@ -389,11 +405,13 @@ export class Gateway extends HarmonyEventEmitter<GatewayTypedEvents> {
async reconnect(forceNew?: boolean): Promise<void> {
this.emit('reconnecting')
clearInterval(this.heartbeatIntervalID)
if (forceNew === true) {
await this.cache.delete('session_id')
await this.cache.delete('seq')
await this.cache.delete(`session_id_${this.shards?.join('-') ?? '0'}`)
await this.cache.delete(`seq_${this.shards?.join('-') ?? '0'}`)
}
this.close(1000, RECONNECT_REASON)
this.initWebsocket()
}

View File

@ -14,7 +14,7 @@ import { Extension } from './extensions.ts'
import { SlashClient } from './slashClient.ts'
import { Interaction } from '../structures/slash.ts'
import { SlashModule } from './slashModule.ts'
import type { ShardManager } from './shard.ts'
import { ShardManager } from './shard.ts'
import { Application } from '../structures/application.ts'
import { Invite } from '../structures/invite.ts'
import { INVITE } from '../types/endpoint.ts'
@ -67,7 +67,7 @@ export interface ClientOptions {
fetchGatewayInfo?: boolean
/** ADVANCED: Shard ID to launch on */
shard?: number
/** Shard count. Set to 'auto' for automatic sharding */
/** ADVACNED: Shard count. */
shardCount?: number | 'auto'
}
@ -75,8 +75,6 @@ export interface ClientOptions {
* Discord Client.
*/
export class Client extends HarmonyEventEmitter<ClientEvents> {
/** Gateway object */
gateway: Gateway
/** REST Manager - used to make all requests */
rest: RESTManager
/** User which Client logs in to, undefined until logs in */
@ -133,11 +131,11 @@ export class Client extends HarmonyEventEmitter<ClientEvents> {
_id?: string
/** Shard on which this Client is */
shard: number = 0
shard?: number
/** Shard Count */
shardCount: number | 'auto' = 1
shardCount: number | 'auto' = 'auto'
/** Shard Manager of this Client if Sharded */
shards?: ShardManager
shards: ShardManager
/** Collectors set */
collectors: Set<Collector> = new Set()
@ -151,11 +149,17 @@ export class Client extends HarmonyEventEmitter<ClientEvents> {
}
}
get gateway(): Gateway {
// eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion
return this.shards.list.get('0') as Gateway
}
constructor(options: ClientOptions = {}) {
super()
this._id = options.id
this.token = options.token
this.intents = options.intents
this.shards = new ShardManager(this)
this.forceNewSession = options.forceNewSession
if (options.cache !== undefined) this.cache = options.cache
if (options.presence !== undefined)
@ -220,7 +224,6 @@ export class Client extends HarmonyEventEmitter<ClientEvents> {
if (options.restOptions !== undefined)
Object.assign(restOptions, options.restOptions)
this.rest = new RESTManager(restOptions)
this.gateway = new Gateway(this)
}
/**
@ -301,9 +304,11 @@ export class Client extends HarmonyEventEmitter<ClientEvents> {
} else throw new Error('No Gateway Intents were provided')
this.rest.token = token
this.gateway.token = token
this.gateway.intents = intents
this.gateway.initWebsocket()
if (this.shard !== undefined) {
if (typeof this.shardCount === 'number')
this.shards.cachedShardCount = this.shardCount
await this.shards.launch(this.shard)
} else await this.shards.connect()
return this.waitFor('ready', () => true).then(() => this)
}

View File

@ -1,9 +1,10 @@
import { Collection } from '../utils/collection.ts'
import { Client } from './client.ts'
import type { Client } from './client.ts'
import { RESTManager } from './rest.ts'
import { Gateway } from '../gateway/index.ts'
import { HarmonyEventEmitter } from '../utils/events.ts'
import { GatewayEvents } from '../types/gateway.ts'
import { delay } from '../utils/delay.ts'
// eslint-disable-next-line @typescript-eslint/consistent-type-definitions
export type ShardManagerEvents = {
@ -18,6 +19,8 @@ export class ShardManager extends HarmonyEventEmitter<ShardManagerEvents> {
list: Collection<string, Gateway> = new Collection()
client: Client
cachedShardCount?: number
queueProcessing: boolean = false
queue: CallableFunction[] = []
get rest(): RESTManager {
return this.client.rest
@ -28,6 +31,32 @@ export class ShardManager extends HarmonyEventEmitter<ShardManagerEvents> {
this.client = client
}
debug(msg: string): void {
this.client.debug('Shards', msg)
}
enqueueIdentify(fn: CallableFunction): ShardManager {
this.queue.push(fn)
// eslint-disable-next-line @typescript-eslint/no-floating-promises
if (!this.queueProcessing) this.processQueue()
return this
}
private async processQueue(): Promise<void> {
if (this.queueProcessing || this.queue.length === 0) return
this.queueProcessing = true
const item = this.queue[0]
await item()
this.queue.shift()
await delay(5000)
this.queueProcessing = false
if (this.queue.length === 0) {
this.queueProcessing = false
} else {
await this.processQueue()
}
}
async getShardCount(): Promise<number> {
let shardCount: number
if (this.cachedShardCount !== undefined) shardCount = this.cachedShardCount
@ -46,10 +75,14 @@ export class ShardManager extends HarmonyEventEmitter<ShardManagerEvents> {
if (this.list.has(id.toString()) === true)
throw new Error(`Shard ${id} already launched`)
this.debug(`Launching Shard: ${id}`)
const shardCount = await this.getShardCount()
const gw = new Gateway(this.client, [Number(id), shardCount])
gw.token = this.client.token
gw.intents = this.client.intents
this.list.set(id.toString(), gw)
gw.initWebsocket()
this.emit('launch', id)
@ -65,11 +98,24 @@ export class ShardManager extends HarmonyEventEmitter<ShardManagerEvents> {
return gw.waitFor(GatewayEvents.Ready, () => true).then(() => this)
}
async start(): Promise<ShardManager> {
/** Launches all Shards */
async connect(): Promise<ShardManager> {
const shardCount = await this.getShardCount()
for (let i = 0; i <= shardCount; i++) {
this.client.shardCount = shardCount
this.debug(`Launching ${shardCount} shard${shardCount === 1 ? '' : 's'}...`)
const startTime = Date.now()
for (let i = 0; i < shardCount; i++) {
await this.launch(i)
}
const endTime = Date.now()
const diff = endTime - startTime
this.debug(
`Launched ${shardCount} shards! Time taken: ${Math.floor(diff / 1000)}s`
)
return this
}
get(id: number): Gateway | undefined {
return this.list.get(id.toString())
}
}

View File

@ -181,6 +181,11 @@ export class Guild extends Base {
approximatePresenceCount?: number
bans: GuildBans
/** Get Shard ID of Guild on which it is */
get shardID(): number {
return Number((BigInt(this.id) << 22n) % BigInt(this.client.shardCount))
}
constructor(client: Client, data: GuildPayload) {
super(client, data)
this.id = data.id
@ -301,7 +306,7 @@ export class Guild extends Base {
timeout: number = 60000
): Promise<Guild> {
return await new Promise((resolve, reject) => {
this.client.gateway?.requestMembers(this.id, options)
this.client.shards.get(this.shardID)?.requestMembers(this.id, options)
if (!wait) return resolve(this)
else {
let chunked = false

View File

@ -57,7 +57,9 @@ export class VoiceChannel extends Channel {
if (done >= 2) resolve(vcdata)
}
this.client.gateway?.updateVoiceState(this.guild.id, this.id, options)
this.client.shards
.get(this.guild.shardID)
?.updateVoiceState(this.guild.id, this.id, options)
this.client.on('voiceStateAdd', onVoiceStateAdd)
this.client.on('voiceServerUpdate', onVoiceServerUpdate)
@ -77,7 +79,9 @@ export class VoiceChannel extends Channel {
}
leave(): void {
this.client.gateway?.updateVoiceState(this.guild.id, undefined)
this.client.shards
.get(this.guild.shardID)
?.updateVoiceState(this.guild.id, undefined)
}
readFromData(data: GuildVoiceChannelPayload): void {

View File

@ -32,10 +32,6 @@ client.on('messageUpdate', (before, after) => {
console.log(`After: ${after.author.tag}: ${after.content}`)
})
client.on('messageUpdateUncached', (msg) => {
console.log(`Message: ${msg.author.tag}: ${msg.content}`)
})
client.on('guildMemberAdd', (member) => {
console.log(`Member Join: ${member.user.tag}`)
})
@ -73,7 +69,7 @@ client.on('inviteDelete', (invite: Invite) => {
console.log(`Invite Delete: ${invite.code}`)
})
client.on('inviteDeleteUncached', (invite: Invite) => {
client.on('inviteDeleteUncached', (invite) => {
console.log(invite)
})

View File

@ -23,7 +23,8 @@ const client = new Client({
// cache: new RedisCacheAdapter({
// hostname: '127.0.0.1',
// port: 6379
// }) // Defaults to in-memory Caching
// }), // Defaults to in-memory Caching
// shardCount: 2
})
client.on('ready', () => {
@ -154,6 +155,11 @@ client.on('messageCreate', async (msg: Message) => {
msg.channel.send({
file: new MessageAttachment('hello.txt', 'hello world')
})
} else if (msg.content === '!join') {
if (msg.member === undefined) return
const vs = await msg.guild?.voiceStates.get(msg.member.id)
if (typeof vs !== 'object') return
vs.channel?.join()
}
})
@ -166,7 +172,7 @@ client.on('messageReactionRemove', (reaction, user) => {
}
})
client.connect(TOKEN, Intents.All)
client.connect(TOKEN, Intents.None)
// OLD: Was a way to reproduce reconnect infinite loop
// setTimeout(() => {