diff --git a/src/gateway/index.ts b/src/gateway/index.ts index 9dc8031..955b61d 100644 --- a/src/gateway/index.ts +++ b/src/gateway/index.ts @@ -111,7 +111,7 @@ export class Gateway extends HarmonyEventEmitter { if (!this.initialized) { this.initialized = true - await this.sendIdentify(this.client.forceNewSession) + this.enqueueIdentify(this.client.forceNewSession) } else { // eslint-disable-next-line @typescript-eslint/no-floating-promises this.sendResume() @@ -134,14 +134,14 @@ export class Gateway extends HarmonyEventEmitter { ) if (d !== true) { this.debug(`Session was invalidated, deleting from cache`) - await this.cache.delete('session_id') - await this.cache.delete('seq') + await this.cache.delete(`session_id_${this.shards?.join('-') ?? '0'}`) + await this.cache.delete(`seq_${this.shards?.join('-') ?? '0'}`) this.sessionID = undefined this.sequenceID = undefined } this.timedIdentify = setTimeout(async () => { this.timedIdentify = null - await this.sendIdentify(!(d as boolean)) + this.enqueueIdentify(!(d as boolean)) }, 5000) break @@ -149,7 +149,7 @@ export class Gateway extends HarmonyEventEmitter { this.heartbeatServerResponded = true if (s !== null) { this.sequenceID = s - await this.cache.set('seq', s) + await this.cache.set(`seq_${this.shards?.join('-') ?? '0'}`, s) } if (t !== null && t !== undefined) { this.emit(t as any, d) @@ -167,8 +167,11 @@ export class Gateway extends HarmonyEventEmitter { // this.token = d.token this.sessionID = d.session_id this.sequenceID = d.seq - await this.cache.set('seq', d.seq) - await this.cache.set('session_id', this.sessionID) + await this.cache.set(`seq_${this.shards?.join('-') ?? '0'}`, d.seq) + await this.cache.set( + `session_id_${this.shards?.join('-') ?? '0'}`, + this.sessionID + ) this.emit('resume') break } @@ -233,10 +236,12 @@ export class Gateway extends HarmonyEventEmitter { this.debug( 'Unknown Close code, probably connection error. Reconnecting in 5s.' ) + if (this.timedIdentify !== null) { clearTimeout(this.timedIdentify) this.debug('Timed Identify found. Cleared timeout.') } + await delay(5000) await this.reconnect(true) break @@ -255,7 +260,12 @@ export class Gateway extends HarmonyEventEmitter { error.name = 'ErrorEvent' console.log(error) this.emit('error', error, event) - await this.reconnect() + } + + private enqueueIdentify(forceNew?: boolean): void { + this.client.shards.enqueueIdentify( + async () => await this.sendIdentify(forceNew) + ) } private async sendIdentify(forceNewSession?: boolean): Promise { @@ -279,7 +289,9 @@ export class Gateway extends HarmonyEventEmitter { } if (forceNewSession === undefined || !forceNewSession) { - const sessionIDCached = await this.cache.get('session_id') + const sessionIDCached = await this.cache.get( + `session_id_${this.shards?.join('-') ?? '0'}` + ) if (sessionIDCached !== undefined) { this.debug(`Found Cached SessionID: ${sessionIDCached}`) this.sessionID = sessionIDCached @@ -320,12 +332,16 @@ export class Gateway extends HarmonyEventEmitter { throw new Error('Intents not specified') if (this.sessionID === undefined) { - this.sessionID = await this.cache.get('session_id') - if (this.sessionID === undefined) return await this.sendIdentify() + this.sessionID = await this.cache.get( + `session_id_${this.shards?.join('-') ?? '0'}` + ) + if (this.sessionID === undefined) return this.enqueueIdentify() } this.debug(`Preparing to resume with Session: ${this.sessionID}`) if (this.sequenceID === undefined) { - const cached = await this.cache.get('seq') + const cached = await this.cache.get( + `seq_${this.shards?.join('-') ?? '0'}` + ) if (cached !== undefined) this.sequenceID = typeof cached === 'string' ? parseInt(cached) : cached } @@ -389,11 +405,13 @@ export class Gateway extends HarmonyEventEmitter { async reconnect(forceNew?: boolean): Promise { this.emit('reconnecting') + clearInterval(this.heartbeatIntervalID) if (forceNew === true) { - await this.cache.delete('session_id') - await this.cache.delete('seq') + await this.cache.delete(`session_id_${this.shards?.join('-') ?? '0'}`) + await this.cache.delete(`seq_${this.shards?.join('-') ?? '0'}`) } + this.close(1000, RECONNECT_REASON) this.initWebsocket() } diff --git a/src/models/client.ts b/src/models/client.ts index 7b28f84..926cc2c 100644 --- a/src/models/client.ts +++ b/src/models/client.ts @@ -14,7 +14,7 @@ import { Extension } from './extensions.ts' import { SlashClient } from './slashClient.ts' import { Interaction } from '../structures/slash.ts' import { SlashModule } from './slashModule.ts' -import type { ShardManager } from './shard.ts' +import { ShardManager } from './shard.ts' import { Application } from '../structures/application.ts' import { Invite } from '../structures/invite.ts' import { INVITE } from '../types/endpoint.ts' @@ -67,7 +67,7 @@ export interface ClientOptions { fetchGatewayInfo?: boolean /** ADVANCED: Shard ID to launch on */ shard?: number - /** Shard count. Set to 'auto' for automatic sharding */ + /** ADVACNED: Shard count. */ shardCount?: number | 'auto' } @@ -75,8 +75,6 @@ export interface ClientOptions { * Discord Client. */ export class Client extends HarmonyEventEmitter { - /** Gateway object */ - gateway: Gateway /** REST Manager - used to make all requests */ rest: RESTManager /** User which Client logs in to, undefined until logs in */ @@ -133,11 +131,11 @@ export class Client extends HarmonyEventEmitter { _id?: string /** Shard on which this Client is */ - shard: number = 0 + shard?: number /** Shard Count */ - shardCount: number | 'auto' = 1 + shardCount: number | 'auto' = 'auto' /** Shard Manager of this Client if Sharded */ - shards?: ShardManager + shards: ShardManager /** Collectors set */ collectors: Set = new Set() @@ -151,11 +149,17 @@ export class Client extends HarmonyEventEmitter { } } + get gateway(): Gateway { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion + return this.shards.list.get('0') as Gateway + } + constructor(options: ClientOptions = {}) { super() this._id = options.id this.token = options.token this.intents = options.intents + this.shards = new ShardManager(this) this.forceNewSession = options.forceNewSession if (options.cache !== undefined) this.cache = options.cache if (options.presence !== undefined) @@ -220,7 +224,6 @@ export class Client extends HarmonyEventEmitter { if (options.restOptions !== undefined) Object.assign(restOptions, options.restOptions) this.rest = new RESTManager(restOptions) - this.gateway = new Gateway(this) } /** @@ -301,9 +304,11 @@ export class Client extends HarmonyEventEmitter { } else throw new Error('No Gateway Intents were provided') this.rest.token = token - this.gateway.token = token - this.gateway.intents = intents - this.gateway.initWebsocket() + if (this.shard !== undefined) { + if (typeof this.shardCount === 'number') + this.shards.cachedShardCount = this.shardCount + await this.shards.launch(this.shard) + } else await this.shards.connect() return this.waitFor('ready', () => true).then(() => this) } diff --git a/src/models/shard.ts b/src/models/shard.ts index cd6601b..80cd649 100644 --- a/src/models/shard.ts +++ b/src/models/shard.ts @@ -1,9 +1,10 @@ import { Collection } from '../utils/collection.ts' -import { Client } from './client.ts' +import type { Client } from './client.ts' import { RESTManager } from './rest.ts' import { Gateway } from '../gateway/index.ts' import { HarmonyEventEmitter } from '../utils/events.ts' import { GatewayEvents } from '../types/gateway.ts' +import { delay } from '../utils/delay.ts' // eslint-disable-next-line @typescript-eslint/consistent-type-definitions export type ShardManagerEvents = { @@ -18,6 +19,8 @@ export class ShardManager extends HarmonyEventEmitter { list: Collection = new Collection() client: Client cachedShardCount?: number + queueProcessing: boolean = false + queue: CallableFunction[] = [] get rest(): RESTManager { return this.client.rest @@ -28,6 +31,32 @@ export class ShardManager extends HarmonyEventEmitter { this.client = client } + debug(msg: string): void { + this.client.debug('Shards', msg) + } + + enqueueIdentify(fn: CallableFunction): ShardManager { + this.queue.push(fn) + // eslint-disable-next-line @typescript-eslint/no-floating-promises + if (!this.queueProcessing) this.processQueue() + return this + } + + private async processQueue(): Promise { + if (this.queueProcessing || this.queue.length === 0) return + this.queueProcessing = true + const item = this.queue[0] + await item() + this.queue.shift() + await delay(5000) + this.queueProcessing = false + if (this.queue.length === 0) { + this.queueProcessing = false + } else { + await this.processQueue() + } + } + async getShardCount(): Promise { let shardCount: number if (this.cachedShardCount !== undefined) shardCount = this.cachedShardCount @@ -46,10 +75,14 @@ export class ShardManager extends HarmonyEventEmitter { if (this.list.has(id.toString()) === true) throw new Error(`Shard ${id} already launched`) + this.debug(`Launching Shard: ${id}`) const shardCount = await this.getShardCount() const gw = new Gateway(this.client, [Number(id), shardCount]) + gw.token = this.client.token + gw.intents = this.client.intents this.list.set(id.toString(), gw) + gw.initWebsocket() this.emit('launch', id) @@ -65,11 +98,24 @@ export class ShardManager extends HarmonyEventEmitter { return gw.waitFor(GatewayEvents.Ready, () => true).then(() => this) } - async start(): Promise { + /** Launches all Shards */ + async connect(): Promise { const shardCount = await this.getShardCount() - for (let i = 0; i <= shardCount; i++) { + this.client.shardCount = shardCount + this.debug(`Launching ${shardCount} shard${shardCount === 1 ? '' : 's'}...`) + const startTime = Date.now() + for (let i = 0; i < shardCount; i++) { await this.launch(i) } + const endTime = Date.now() + const diff = endTime - startTime + this.debug( + `Launched ${shardCount} shards! Time taken: ${Math.floor(diff / 1000)}s` + ) return this } + + get(id: number): Gateway | undefined { + return this.list.get(id.toString()) + } } diff --git a/src/structures/guild.ts b/src/structures/guild.ts index f438b4d..d2bbfc3 100644 --- a/src/structures/guild.ts +++ b/src/structures/guild.ts @@ -181,6 +181,11 @@ export class Guild extends Base { approximatePresenceCount?: number bans: GuildBans + /** Get Shard ID of Guild on which it is */ + get shardID(): number { + return Number((BigInt(this.id) << 22n) % BigInt(this.client.shardCount)) + } + constructor(client: Client, data: GuildPayload) { super(client, data) this.id = data.id @@ -301,7 +306,7 @@ export class Guild extends Base { timeout: number = 60000 ): Promise { return await new Promise((resolve, reject) => { - this.client.gateway?.requestMembers(this.id, options) + this.client.shards.get(this.shardID)?.requestMembers(this.id, options) if (!wait) return resolve(this) else { let chunked = false diff --git a/src/structures/guildVoiceChannel.ts b/src/structures/guildVoiceChannel.ts index 4308c8c..49dcd2a 100644 --- a/src/structures/guildVoiceChannel.ts +++ b/src/structures/guildVoiceChannel.ts @@ -57,7 +57,9 @@ export class VoiceChannel extends Channel { if (done >= 2) resolve(vcdata) } - this.client.gateway?.updateVoiceState(this.guild.id, this.id, options) + this.client.shards + .get(this.guild.shardID) + ?.updateVoiceState(this.guild.id, this.id, options) this.client.on('voiceStateAdd', onVoiceStateAdd) this.client.on('voiceServerUpdate', onVoiceServerUpdate) @@ -77,7 +79,9 @@ export class VoiceChannel extends Channel { } leave(): void { - this.client.gateway?.updateVoiceState(this.guild.id, undefined) + this.client.shards + .get(this.guild.shardID) + ?.updateVoiceState(this.guild.id, undefined) } readFromData(data: GuildVoiceChannelPayload): void { diff --git a/src/test/cmd.ts b/src/test/cmd.ts index db58ca4..5063198 100644 --- a/src/test/cmd.ts +++ b/src/test/cmd.ts @@ -32,10 +32,6 @@ client.on('messageUpdate', (before, after) => { console.log(`After: ${after.author.tag}: ${after.content}`) }) -client.on('messageUpdateUncached', (msg) => { - console.log(`Message: ${msg.author.tag}: ${msg.content}`) -}) - client.on('guildMemberAdd', (member) => { console.log(`Member Join: ${member.user.tag}`) }) @@ -73,7 +69,7 @@ client.on('inviteDelete', (invite: Invite) => { console.log(`Invite Delete: ${invite.code}`) }) -client.on('inviteDeleteUncached', (invite: Invite) => { +client.on('inviteDeleteUncached', (invite) => { console.log(invite) }) diff --git a/src/test/index.ts b/src/test/index.ts index e91db95..aa30522 100644 --- a/src/test/index.ts +++ b/src/test/index.ts @@ -23,7 +23,8 @@ const client = new Client({ // cache: new RedisCacheAdapter({ // hostname: '127.0.0.1', // port: 6379 - // }) // Defaults to in-memory Caching + // }), // Defaults to in-memory Caching + // shardCount: 2 }) client.on('ready', () => { @@ -154,6 +155,11 @@ client.on('messageCreate', async (msg: Message) => { msg.channel.send({ file: new MessageAttachment('hello.txt', 'hello world') }) + } else if (msg.content === '!join') { + if (msg.member === undefined) return + const vs = await msg.guild?.voiceStates.get(msg.member.id) + if (typeof vs !== 'object') return + vs.channel?.join() } }) @@ -166,7 +172,7 @@ client.on('messageReactionRemove', (reaction, user) => { } }) -client.connect(TOKEN, Intents.All) +client.connect(TOKEN, Intents.None) // OLD: Was a way to reproduce reconnect infinite loop // setTimeout(() => {