diff --git a/Dockerfile b/Dockerfile index 022002a..53d0a3c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,15 +1,21 @@ -FROM node:alpine -WORKDIR /app +FROM oven/bun:1.3 -COPY ./package.json /app/package.json -COPY ./pnpm-lock.yaml /app/pnpm-lock.yaml -RUN npm install -g pnpm +RUN apt-get update && apt-get install -y \ + build-essential \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* -RUN pnpm install -RUN pnpm build +RUN npm install -g pnpm COPY . /app -WORKDIR /app -COPY ./.env /app/.env -WORKDIR /app \ No newline at end of file +WORKDIR /app/Ayako/packages/Utility +RUN rm -rf ./dist +RUN pnpm install +RUN pnpm run build + +WORKDIR /app/Ayako/packages/Gateway +COPY ./.env /app/Ayako/packages/Gateway/.env +RUN rm -rf ./dist +RUN pnpm install +RUN pnpm run build diff --git a/package.json b/package.json index d512149..751bb82 100644 --- a/package.json +++ b/package.json @@ -5,9 +5,9 @@ "type": "module", "scripts": { "build": "swc src -d dist --strip-leading-paths --copy-files", - "dev": "pnpm build && node --enable-source-maps --experimental-wasm-modules --experimental-json-modules ./dist/index.js --debug --warn --dev --debug-db --local", + "dev": "pnpm build && bun ./dist/index.js --debug --warn --dev --debug-db --local", "lint": "pnpx eslint 'src/**/*.ts' --fix", - "run": "node --max-old-space-size=64 --no-deprecation --no-warnings --experimental-json-modules ./dist/index.js", + "run": "bun ./dist/index.js", "start": "pnpm run run", "watch": "swc src -d dist --strip-leading-paths --copy-files --watch" }, @@ -19,7 +19,6 @@ "discord-hybrid-sharding": "^3.0.1", "dotenv": "^17.2.3", "glob": "^13.0.0", - "ioredis": "^5.9.1", "node-schedule": "^2.1.1", "prom-client": "^15.1.3" }, @@ -32,6 +31,7 @@ "@swc/core": "1.15.8", "@total-typescript/ts-reset": "^0.6.1", "@types/glob": "^9.0.0", + "@types/bun": "latest", "@types/node": "^25.0.7", "@types/node-schedule": "^2.1.8", "@typescript-eslint/eslint-plugin": "^8.53.0", diff --git a/src/BaseClient/Bot/CacheHandlers/AutoModeration.ts b/src/BaseClient/Bot/CacheHandlers/AutoModeration.ts index 6f4b071..34fa716 100644 --- a/src/BaseClient/Bot/CacheHandlers/AutoModeration.ts +++ b/src/BaseClient/Bot/CacheHandlers/AutoModeration.ts @@ -6,7 +6,6 @@ import { type GatewayAutoModerationRuleUpdateDispatchData, } from 'discord-api-types/gateway/v10'; -import firstChannelInteraction from '../../../Util/firstChannelInteraction.js'; import firstGuildInteraction from '../../../Util/firstGuildInteraction.js'; import redis from '../Cache.js'; @@ -15,7 +14,6 @@ export default { data: GatewayAutoModerationActionExecutionDispatchData, ) => { firstGuildInteraction(data.guild_id); - if (data.channel_id) firstChannelInteraction(data.channel_id, data.guild_id); }, [GatewayDispatchEvents.AutoModerationRuleCreate]: async ( diff --git a/src/BaseClient/Bot/CacheHandlers/Channel.ts b/src/BaseClient/Bot/CacheHandlers/Channel.ts index 0a6d9ed..56cf1dc 100644 --- a/src/BaseClient/Bot/CacheHandlers/Channel.ts +++ b/src/BaseClient/Bot/CacheHandlers/Channel.ts @@ -6,8 +6,6 @@ import { type GatewayChannelUpdateDispatchData, } from 'discord-api-types/gateway/v10'; -import firstChannelInteraction from '../../../Util/firstChannelInteraction.js'; -import requestChannelPins from '../../../Util/requestChannelPins.js'; import redis from '../Cache.js'; export default { @@ -20,35 +18,27 @@ export default { redis.pins.delAll(data.id); redis.channelStatus.del(data.guild_id, data.id); - const pipeline = redis.cacheDb.pipeline(); - const messages = await redis.cacheDb.hgetall(redis.messages.keystore(data.guild_id)); - - pipeline.hdel( + const messageKeys = await redis.cacheDb.hscanKeys( redis.messages.keystore(data.guild_id), - ...Object.keys(messages).filter((m) => m.includes(data.id)), + `*${data.id}*`, ); - pipeline.del(...Object.keys(messages).filter((m) => m.includes(data.id))); - await pipeline.exec(); - }, - - [GatewayDispatchEvents.ChannelPinsUpdate]: async (data: GatewayChannelPinsUpdateDispatchData) => { - if (!data.guild_id) return; - const success = await firstChannelInteraction(data.channel_id, data.guild_id); - if (success) return; + if (messageKeys.length === 0) return; - await requestChannelPins(data.channel_id, data.guild_id); + const pipeline = redis.cacheDb.pipeline(); + pipeline.hdel(redis.messages.keystore(data.guild_id), ...messageKeys); + pipeline.del(...messageKeys); + await pipeline.exec(); }, + [GatewayDispatchEvents.ChannelPinsUpdate]: async (_: GatewayChannelPinsUpdateDispatchData) => {}, + [GatewayDispatchEvents.ChannelUpdate]: async (data: GatewayChannelUpdateDispatchData) => { - firstChannelInteraction(data.id, data.guild_id); redis.channels.set(data); }, // eslint-disable-next-line @typescript-eslint/naming-convention VOICE_CHANNEL_STATUS_UPDATE: async (data: { status: string; id: string; guild_id: string }) => { - firstChannelInteraction(data.id, data.guild_id); - if (!data.status?.length) { redis.channelStatus.del(data.guild_id, data.id); return; @@ -63,8 +53,6 @@ export default { guild_id: string; channels: { status: string; id: string }[]; }) => { - await Promise.all(data.channels.map(async (c) => firstChannelInteraction(c.id, data.guild_id))); - await redis.channelStatus.delAll(data.guild_id); data.channels.forEach((c) => { diff --git a/src/BaseClient/Bot/CacheHandlers/Guilds.ts b/src/BaseClient/Bot/CacheHandlers/Guilds.ts index 54d9139..78c614f 100644 --- a/src/BaseClient/Bot/CacheHandlers/Guilds.ts +++ b/src/BaseClient/Bot/CacheHandlers/Guilds.ts @@ -1,3 +1,4 @@ +import type { ChainableCommanderInterface } from '@ayako/utility'; import { GatewayDispatchEvents, type GatewayGuildAuditLogEntryCreateDispatchData, @@ -27,7 +28,8 @@ import { type GuildMemberFlags, } from 'discord-api-types/v10'; -import firstGuildInteraction, { tasks } from '../../../Util/firstGuildInteraction.js'; +import firstGuildInteraction from '../../../Util/firstGuildInteraction.js'; +import { priorityQueue } from '../../../Util/PriorityQueue/index.js'; import redis from '../Cache.js'; import { cache } from '../Client.js'; @@ -66,7 +68,7 @@ export default { const rGuild = redis.guilds.apiToR(data); if (rGuild) { const guildJson = JSON.stringify(rGuild); - redis.batcher.queueSync((p) => { + redis.queueSync((p: ChainableCommanderInterface) => { p.set(`cache:guilds:${guildId}:current`, guildJson, 'EX', 604800); p.hset('keystore:guilds', `cache:guilds:${guildId}`, 0); }); @@ -84,14 +86,14 @@ export default { if (rMember) { const memberJson = JSON.stringify(rMember); - redis.batcher.queueSync((p) => { + redis.queueSync((p: ChainableCommanderInterface) => { p.set(`cache:members:${guildId}:${userId}:current`, memberJson, 'EX', 604800); p.hset(`keystore:members:${guildId}`, `cache:members:${guildId}:${userId}`, 0); }); } if (rUser) { const userJson = JSON.stringify(rUser); - redis.batcher.queueSync((p) => { + redis.queueSync((p: ChainableCommanderInterface) => { p.set(`cache:users:${userId}:current`, userJson, 'EX', 604800); }); } @@ -104,7 +106,7 @@ export default { (data.channels as unknown[])[i] = undefined; if (rChannel) { const channelJson = JSON.stringify(rChannel); - redis.batcher.queueSync((p) => { + redis.queueSync((p: ChainableCommanderInterface) => { p.set(`cache:channels:${channel.id}:current`, channelJson, 'EX', 604800); p.hset(`keystore:channels:${guildId}`, `cache:channels:${channel.id}`, 0); }); @@ -118,7 +120,7 @@ export default { (data.roles as unknown[])[i] = undefined; if (rRole) { const roleJson = JSON.stringify(rRole); - redis.batcher.queueSync((p) => { + redis.queueSync((p: ChainableCommanderInterface) => { p.set(`cache:roles:${role.id}:current`, roleJson, 'EX', 604800); p.hset(`keystore:roles:${guildId}`, `cache:roles:${role.id}`, 0); }); @@ -133,7 +135,7 @@ export default { (data.emojis as unknown[])[i] = undefined; if (rEmoji) { const emojiJson = JSON.stringify(rEmoji); - redis.batcher.queueSync((p) => { + redis.queueSync((p: ChainableCommanderInterface) => { p.set(`cache:emojis:${emoji.id}:current`, emojiJson, 'EX', 604800); p.hset(`keystore:emojis:${guildId}`, `cache:emojis:${emoji.id}`, 0); }); @@ -147,7 +149,7 @@ export default { (data.stickers as unknown[])[i] = undefined; if (rSticker) { const stickerJson = JSON.stringify(rSticker); - redis.batcher.queueSync((p) => { + redis.queueSync((p: ChainableCommanderInterface) => { p.set(`cache:stickers:${sticker.id}:current`, stickerJson, 'EX', 604800); p.hset(`keystore:stickers:${guildId}`, `cache:stickers:${sticker.id}`, 0); }); @@ -161,7 +163,7 @@ export default { (data.soundboard_sounds as unknown[])[i] = undefined; if (rSound) { const soundJson = JSON.stringify(rSound); - redis.batcher.queueSync((p) => { + redis.queueSync((p: ChainableCommanderInterface) => { p.set(`cache:soundboards:${sound.sound_id}:current`, soundJson, 'EX', 604800); }); } @@ -176,7 +178,7 @@ export default { (data.voice_states as unknown[])[i] = undefined; if (rVoice) { const voiceJson = JSON.stringify(rVoice); - redis.batcher.queueSync((p) => { + redis.queueSync((p: ChainableCommanderInterface) => { p.set(`cache:voices:${guildId}:${voice.user_id}:current`, voiceJson, 'EX', 604800); p.hset(`keystore:voices:${guildId}`, `cache:voices:${guildId}:${voice.user_id}`, 0); }); @@ -191,7 +193,7 @@ export default { (data.threads as unknown[])[i] = undefined; if (rThread) { const threadJson = JSON.stringify(rThread); - redis.batcher.queueSync((p) => { + redis.queueSync((p: ChainableCommanderInterface) => { p.set(`cache:threads:${thread.id}:current`, threadJson, 'EX', 604800); p.hset(`keystore:threads:${guildId}`, `cache:threads:${thread.id}`, 0); }); @@ -205,7 +207,7 @@ export default { (data.guild_scheduled_events as unknown[])[i] = undefined; if (rEvent) { const eventJson = JSON.stringify(rEvent); - redis.batcher.queueSync((p) => { + redis.queueSync((p: ChainableCommanderInterface) => { p.set(`cache:events:${event.id}:current`, eventJson, 'EX', 604800); p.hset(`keystore:events:${event.guild_id}`, `cache:events:${event.id}`, 0); }); @@ -227,36 +229,6 @@ export default { redis.channelStatus.delAll(data.id); redis.pins.delAll(data.id); - const getPipeline = redis.cacheDb.pipeline(); - - getPipeline.hgetall(redis.audits.keystore(data.id)); - getPipeline.hgetall(redis.automods.keystore(data.id)); - getPipeline.hgetall(redis.bans.keystore(data.id)); - getPipeline.hgetall(redis.channels.keystore(data.id)); - getPipeline.hgetall(redis.commandPermissions.keystore(data.id)); - getPipeline.hgetall(redis.emojis.keystore(data.id)); - getPipeline.hgetall(redis.events.keystore(data.id)); - getPipeline.hgetall(redis.guildCommands.keystore(data.id)); - getPipeline.hgetall(redis.integrations.keystore(data.id)); - getPipeline.hgetall(redis.invites.keystore(data.id)); - getPipeline.hgetall(redis.members.keystore(data.id)); - getPipeline.hgetall(redis.messages.keystore(data.id)); - getPipeline.hgetall(redis.reactions.keystore(data.id)); - getPipeline.hgetall(redis.roles.keystore(data.id)); - getPipeline.hgetall(redis.soundboards.keystore(data.id)); - getPipeline.hgetall(redis.stages.keystore(data.id)); - getPipeline.hgetall(redis.stickers.keystore(data.id)); - getPipeline.hgetall(redis.threads.keystore(data.id)); - getPipeline.hgetall(redis.threadMembers.keystore(data.id)); - getPipeline.hgetall(redis.voices.keystore(data.id)); - getPipeline.hgetall(redis.webhooks.keystore(data.id)); - getPipeline.hgetall(redis.welcomeScreens.keystore(data.id)); - getPipeline.hgetall(redis.onboardings.keystore(data.id)); - getPipeline.hgetall(redis.eventUsers.keystore(data.id)); - - const results = await getPipeline.exec(); - if (!results) return; - const [ auditlogs, automods, @@ -282,9 +254,35 @@ export default { welcomeScreens, onboarding, eventUsers, - ] = results.map((result) => result[1] || {}); + ] = await Promise.all([ + redis.cacheDb.hscanKeys(redis.audits.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.automods.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.bans.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.channels.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.commandPermissions.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.emojis.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.events.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.guildCommands.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.integrations.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.invites.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.members.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.messages.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.reactions.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.roles.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.soundboards.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.stages.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.stickers.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.threads.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.threadMembers.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.voices.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.webhooks.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.welcomeScreens.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.onboardings.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.eventUsers.keystore(data.id)), + ]); const deletePipeline = redis.cacheDb.pipeline(); + deletePipeline.del(redis.guilds.keystore(data.id)); deletePipeline.del(redis.audits.keystore(data.id)); deletePipeline.del(redis.automods.keystore(data.id)); @@ -311,30 +309,30 @@ export default { deletePipeline.del(redis.onboardings.keystore(data.id)); deletePipeline.del(redis.eventUsers.keystore(data.id)); - deletePipeline.del(...Object.keys(auditlogs)); - deletePipeline.del(...Object.keys(automods)); - deletePipeline.del(...Object.keys(bans)); - deletePipeline.del(...Object.keys(channels)); - deletePipeline.del(...Object.keys(commandPermissions)); - deletePipeline.del(...Object.keys(emojis)); - deletePipeline.del(...Object.keys(events)); - deletePipeline.del(...Object.keys(guildCommands)); - deletePipeline.del(...Object.keys(integrations)); - deletePipeline.del(...Object.keys(invites)); - deletePipeline.del(...Object.keys(members)); - deletePipeline.del(...Object.keys(messages)); - deletePipeline.del(...Object.keys(reactions)); - deletePipeline.del(...Object.keys(roles)); - deletePipeline.del(...Object.keys(soundboards)); - deletePipeline.del(...Object.keys(stages)); - deletePipeline.del(...Object.keys(stickers)); - deletePipeline.del(...Object.keys(threads)); - deletePipeline.del(...Object.keys(threadMembers)); - deletePipeline.del(...Object.keys(voices)); - deletePipeline.del(...Object.keys(webhooks)); - deletePipeline.del(...Object.keys(welcomeScreens)); - deletePipeline.del(...Object.keys(onboarding)); - deletePipeline.del(...Object.keys(eventUsers)); + deletePipeline.del(...auditlogs); + deletePipeline.del(...automods); + deletePipeline.del(...bans); + deletePipeline.del(...channels); + deletePipeline.del(...commandPermissions); + deletePipeline.del(...emojis); + deletePipeline.del(...events); + deletePipeline.del(...guildCommands); + deletePipeline.del(...integrations); + deletePipeline.del(...invites); + deletePipeline.del(...members); + deletePipeline.del(...messages); + deletePipeline.del(...reactions); + deletePipeline.del(...roles); + deletePipeline.del(...soundboards); + deletePipeline.del(...stages); + deletePipeline.del(...stickers); + deletePipeline.del(...threads); + deletePipeline.del(...threadMembers); + deletePipeline.del(...voices); + deletePipeline.del(...webhooks); + deletePipeline.del(...welcomeScreens); + deletePipeline.del(...onboarding); + deletePipeline.del(...eventUsers); await deletePipeline.exec(); }, @@ -348,9 +346,9 @@ export default { firstGuildInteraction(data.guild_id); cache.emojis.set(data.guild_id, data.emojis.length); - const emojis = await redis.cacheDb.hgetall(redis.emojis.keystore(data.guild_id)); + const emojiKeys = await redis.cacheDb.hscanKeys(redis.emojis.keystore(data.guild_id)); const pipeline = redis.cacheDb.pipeline(); - pipeline.del(...Object.keys(emojis)); + pipeline.del(...emojiKeys); pipeline.del(redis.emojis.keystore(data.guild_id)); await pipeline.exec(); @@ -363,7 +361,8 @@ export default { const success = await firstGuildInteraction(data.guild_id); if (success) return; - tasks.integrations(data.guild_id); + const memberCount = cache.members.get(data.guild_id) || 0; + priorityQueue.enqueueGuildTask(data.guild_id, memberCount, 'integrations'); }, [GatewayDispatchEvents.GuildMemberAdd]: async (data: GatewayGuildMemberAddDispatchData) => { @@ -387,7 +386,7 @@ export default { // eslint-disable-next-line no-console console.log('[Chunk] Finished receiving member chunks for', data.guild_id); - cache.requestingGuild = null; + priorityQueue.onMemberChunkComplete(data.guild_id); } if (data.chunk_index === 0) { @@ -536,9 +535,9 @@ export default { firstGuildInteraction(data.guild_id); cache.sounds.set(data.guild_id, data.soundboard_sounds.length); - const sounds = await redis.cacheDb.hgetall(redis.soundboards.keystore(data.guild_id)); + const soundKeys = await redis.cacheDb.hscanKeys(redis.soundboards.keystore(data.guild_id)); const pipeline = redis.cacheDb.pipeline(); - pipeline.del(...Object.keys(sounds)); + pipeline.del(...soundKeys); pipeline.del(redis.soundboards.keystore(data.guild_id)); await pipeline.exec(); @@ -553,9 +552,9 @@ export default { firstGuildInteraction(data.guild_id); cache.stickers.set(data.guild_id, data.stickers.length); - const stickers = await redis.cacheDb.hgetall(redis.stickers.keystore(data.guild_id)); + const stickerKeys = await redis.cacheDb.hscanKeys(redis.stickers.keystore(data.guild_id)); const pipeline = redis.cacheDb.pipeline(); - pipeline.del(...Object.keys(stickers)); + pipeline.del(...stickerKeys); pipeline.del(redis.stickers.keystore(data.guild_id)); await pipeline.exec(); diff --git a/src/BaseClient/Bot/CacheHandlers/Message.ts b/src/BaseClient/Bot/CacheHandlers/Message.ts index e503164..08577ed 100644 --- a/src/BaseClient/Bot/CacheHandlers/Message.ts +++ b/src/BaseClient/Bot/CacheHandlers/Message.ts @@ -13,7 +13,6 @@ import { import { AllThreadGuildChannelTypes } from '../../../Typings/Channel.js'; import evalFn from '../../../Util/eval.js'; -import firstChannelInteraction from '../../../Util/firstChannelInteraction.js'; import firstGuildInteraction from '../../../Util/firstGuildInteraction.js'; import redis from '../Cache.js'; @@ -23,7 +22,6 @@ export default { redis.messages.set(data, data.guild_id || '@me'); firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.channel_id, data.guild_id); } if (!data.webhook_id) redis.users.set(data.author); @@ -39,7 +37,6 @@ export default { [GatewayDispatchEvents.MessageDelete]: async (data: GatewayMessageDeleteDispatchData) => { if (data.guild_id) { firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.channel_id, data.guild_id); } redis.messages.del(data.channel_id, data.id); @@ -49,7 +46,6 @@ export default { [GatewayDispatchEvents.MessageDeleteBulk]: async (data: GatewayMessageDeleteBulkDispatchData) => { if (data.guild_id) { firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.channel_id, data.guild_id); } data.ids.forEach((id) => { @@ -61,7 +57,6 @@ export default { [GatewayDispatchEvents.MessageUpdate]: async (data: GatewayMessageUpdateDispatchData) => { if (data.guild_id) { firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.channel_id, data.guild_id); redis.messages.set(data, data.guild_id); } @@ -70,7 +65,6 @@ export default { [GatewayDispatchEvents.MessagePollVoteAdd]: async (data: GatewayMessagePollVoteDispatchData) => { if (!data.guild_id) return; firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.channel_id, data.guild_id); }, [GatewayDispatchEvents.MessagePollVoteRemove]: async ( @@ -78,7 +72,6 @@ export default { ) => { if (!data.guild_id) return; firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.channel_id, data.guild_id); }, [GatewayDispatchEvents.MessageReactionAdd]: async ( @@ -91,7 +84,6 @@ export default { if (!data.guild_id) return; firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.channel_id, data.guild_id); const cache = await redis.reactions.get( data.channel_id, @@ -125,7 +117,6 @@ export default { if (!data.guild_id) return; firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.channel_id, data.guild_id); const cache = await redis.reactions.get( data.channel_id, @@ -159,16 +150,18 @@ export default { ) => { if (data.guild_id) { firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.channel_id, data.guild_id); } - const pipeline = redis.cacheDb.pipeline(); - const reactions = await redis.cacheDb.hgetall(redis.reactions.keystore(data.message_id)); - pipeline.hdel( + const reactionKeys = await redis.cacheDb.hscanKeys( redis.reactions.keystore(data.message_id), - ...Object.keys(reactions).filter((r) => r.includes(data.message_id)), + `*${data.message_id}*`, ); - pipeline.del(...Object.keys(reactions).filter((r) => r.includes(data.message_id))); + + if (reactionKeys.length === 0) return; + + const pipeline = redis.cacheDb.pipeline(); + pipeline.hdel(redis.reactions.keystore(data.message_id), ...reactionKeys); + pipeline.del(...reactionKeys); await pipeline.exec(); }, @@ -177,15 +170,18 @@ export default { ) => { if (!data.guild_id) return; firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.channel_id, data.guild_id); - - const reactions = await redis.cacheDb.hgetall(redis.reactions.keystore(data.guild_id)); - const pipeline = redis.cacheDb.pipeline(); - const filteredReactions = Object.keys(reactions).filter( - (r) => r.includes(data.message_id) && r.includes((data.emoji.id || data.emoji.name)!), + const emojiId = data.emoji.id || data.emoji.name; + const reactionKeys = await redis.cacheDb.hscanKeys( + redis.reactions.keystore(data.guild_id), + `*${data.message_id}*`, ); + const filteredReactions = reactionKeys.filter((r) => r.includes(emojiId!)); + + if (filteredReactions.length === 0) return; + + const pipeline = redis.cacheDb.pipeline(); pipeline.hdel(redis.reactions.keystore(data.guild_id), ...filteredReactions); pipeline.del(...filteredReactions); await pipeline.exec(); diff --git a/src/BaseClient/Bot/CacheHandlers/Stage.ts b/src/BaseClient/Bot/CacheHandlers/Stage.ts index 48360aa..c499bef 100644 --- a/src/BaseClient/Bot/CacheHandlers/Stage.ts +++ b/src/BaseClient/Bot/CacheHandlers/Stage.ts @@ -5,7 +5,6 @@ import { type GatewayStageInstanceUpdateDispatchData, } from 'discord-api-types/gateway/v10'; -import firstChannelInteraction from '../../../Util/firstChannelInteraction.js'; import firstGuildInteraction from '../../../Util/firstGuildInteraction.js'; import redis from '../Cache.js'; @@ -14,7 +13,6 @@ export default { data: GatewayStageInstanceCreateDispatchData, ) => { firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.channel_id, data.guild_id); redis.stages.set(data); }, @@ -23,7 +21,6 @@ export default { data: GatewayStageInstanceDeleteDispatchData, ) => { firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.channel_id, data.guild_id); redis.stages.del(data.id); }, @@ -32,7 +29,6 @@ export default { data: GatewayStageInstanceUpdateDispatchData, ) => { firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.channel_id, data.guild_id); redis.stages.set(data); }, diff --git a/src/BaseClient/Bot/CacheHandlers/Thread.ts b/src/BaseClient/Bot/CacheHandlers/Thread.ts index 0083bf9..17bd2f3 100644 --- a/src/BaseClient/Bot/CacheHandlers/Thread.ts +++ b/src/BaseClient/Bot/CacheHandlers/Thread.ts @@ -8,7 +8,6 @@ import { type GatewayThreadUpdateDispatchData, } from 'discord-api-types/gateway/v10'; -import firstChannelInteraction from '../../../Util/firstChannelInteraction.js'; import firstGuildInteraction from '../../../Util/firstGuildInteraction.js'; import redis from '../Cache.js'; @@ -26,38 +25,30 @@ export default { firstGuildInteraction(data.guild_id); - const selectPipeline = redis.cacheDb.pipeline(); - selectPipeline.hgetall(redis.threadMembers.keystore(data.guild_id)); - selectPipeline.hgetall(redis.messages.keystore(data.guild_id)); - const result = await selectPipeline.exec(); - if (!result) return; + const [threadMemberKeys, messageKeys] = await Promise.all([ + redis.cacheDb.hscanKeys(redis.threadMembers.keystore(data.guild_id), `*${data.id}*`), + redis.cacheDb.hscanKeys(redis.messages.keystore(data.guild_id), `*${data.id}*`), + ]); + + if (threadMemberKeys.length === 0 && messageKeys.length === 0) return; - const [threadMembers, messages] = result; const deletePipeline = redis.cacheDb.pipeline(); - deletePipeline.hdel( - redis.threadMembers.keystore(data.guild_id), - ...Object.keys(threadMembers).filter((m) => m.includes(data.id)), - ); - deletePipeline.del(...Object.keys(threadMembers).filter((m) => m.includes(data.id))); + if (threadMemberKeys.length > 0) { + deletePipeline.hdel(redis.threadMembers.keystore(data.guild_id), ...threadMemberKeys); + deletePipeline.del(...threadMemberKeys); + } - deletePipeline.hdel( - redis.messages.keystore(data.guild_id), - ...Object.keys(messages).filter((m) => m.includes(data.id)), - ); - deletePipeline.del(...Object.keys(messages).filter((m) => m.includes(data.id))); + if (messageKeys.length > 0) { + deletePipeline.hdel(redis.messages.keystore(data.guild_id), ...messageKeys); + deletePipeline.del(...messageKeys); + } await deletePipeline.exec(); }, [GatewayDispatchEvents.ThreadUpdate]: async (data: GatewayThreadUpdateDispatchData) => { - if (!data.guild_id) { - redis.threads.set(data); - return; - } - - firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.id, data.guild_id); + if (data.guild_id) firstGuildInteraction(data.guild_id); redis.threads.set(data); }, @@ -81,7 +72,6 @@ export default { data: GatewayThreadMembersUpdateDispatchData, ) => { firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.id, data.guild_id); data.added_members?.forEach((threadMember) => { redis.threadMembers.set(threadMember, data.guild_id); @@ -97,7 +87,6 @@ export default { data: GatewayThreadMemberUpdateDispatchData, ) => { firstGuildInteraction(data.guild_id); - if (data.id) firstChannelInteraction(data.id, data.guild_id); redis.threadMembers.set(data, data.guild_id); diff --git a/src/BaseClient/Bot/CacheHandlers/Voice.ts b/src/BaseClient/Bot/CacheHandlers/Voice.ts index a9a7db5..70544b2 100644 --- a/src/BaseClient/Bot/CacheHandlers/Voice.ts +++ b/src/BaseClient/Bot/CacheHandlers/Voice.ts @@ -5,7 +5,6 @@ import { type GatewayVoiceServerUpdateDispatchData, } from 'discord-api-types/v10'; -import firstChannelInteraction from '../../../Util/firstChannelInteraction.js'; import firstGuildInteraction from '../../../Util/firstGuildInteraction.js'; import redis from '../Cache.js'; @@ -14,7 +13,6 @@ export default { data: GatewayVoiceChannelEffectSendDispatchData, ) => { firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.channel_id, data.guild_id); }, [GatewayDispatchEvents.VoiceServerUpdate]: async (data: GatewayVoiceServerUpdateDispatchData) => { @@ -25,7 +23,6 @@ export default { if (!data.guild_id) return; firstGuildInteraction(data.guild_id); - if (data.channel_id) firstChannelInteraction(data.channel_id, data.guild_id); redis.voices.set(data); }, diff --git a/src/BaseClient/Bot/CacheHandlers/index.ts b/src/BaseClient/Bot/CacheHandlers/index.ts index 33a2a9e..4e405b8 100644 --- a/src/BaseClient/Bot/CacheHandlers/index.ts +++ b/src/BaseClient/Bot/CacheHandlers/index.ts @@ -21,9 +21,10 @@ import { AllThreadGuildChannelTypes, } from '../../../Typings/Channel.js'; import emit from '../../../Util/EventBus.js'; -import firstChannelInteraction from '../../../Util/firstChannelInteraction.js'; -import firstGuildInteraction, { tasks } from '../../../Util/firstGuildInteraction.js'; +import firstGuildInteraction from '../../../Util/firstGuildInteraction.js'; +import { priorityQueue } from '../../../Util/PriorityQueue/index.js'; import redis from '../Cache.js'; +import { cache } from '../Client.js'; import ready from '../Events/ready.js'; import AutoModeration from './AutoModeration.js'; @@ -47,14 +48,14 @@ export default (data: GatewayDispatchPayload, shardId: number) => { | Promise | unknown; if (res instanceof Promise) { - res.then(() => emit(data.t, data.d)).catch(() => emit(data.t, data.d)); + res.then(() => emit.call(redis, data.t, data.d)).catch(() => emit.call(redis, data.t, data.d)); } else { - emit(data.t, data.d); + emit.call(redis, data.t, data.d); } } catch (err) { // eslint-disable-next-line no-console console.error(`[CacheHandler] Error processing ${data.t}:`, err); - emit(data.t, data.d); + emit.call(redis, data.t, data.d); } }; @@ -92,9 +93,6 @@ const caches: Record< [GatewayDispatchEvents.InteractionCreate]: async (data: GatewayInteractionCreateDispatchData) => { if (data.guild_id) firstGuildInteraction(data.guild_id); - if (data.channel?.id && data.guild_id) { - firstChannelInteraction(data.channel.id, data.guild_id); - } if (data.user) redis.users.set(data.user); if (data.message && data.guild_id) { @@ -124,13 +122,13 @@ const caches: Record< }, [GatewayDispatchEvents.WebhooksUpdate]: (data: GatewayWebhooksUpdateDispatchData) => { - tasks.webhooks(data.guild_id); + const memberCount = cache.members.get(data.guild_id) || 0; + priorityQueue.enqueueGuildTask(data.guild_id, memberCount, 'webhooks'); }, [GatewayDispatchEvents.TypingStart]: async (data: GatewayTypingStartDispatchData) => { if (!data.member || !data.guild_id) return; firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.channel_id, data.guild_id); redis.members.set(data.member, data.guild_id); }, diff --git a/src/BaseClient/Bot/Client.ts b/src/BaseClient/Bot/Client.ts index 3c2984c..51c37d8 100644 --- a/src/BaseClient/Bot/Client.ts +++ b/src/BaseClient/Bot/Client.ts @@ -60,6 +60,7 @@ export const client = new Client({ rest, gateway }); export const { api } = client; export const cluster = new ClusterClient(client); export const cache: { + approxGuilds: number; guilds: number; members: Map; emojis: Map; @@ -67,13 +68,8 @@ export const cache: { stickers: Map; sounds: Map; user: APIUser | null; - requestingGuild: string | null; - requestGuildQueue: Set; - requestingPins: string | null; - requestPinsQueue: Set; - requestPinsGuildMap: Map; - requestPinsPaused: boolean; } = { + approxGuilds: await api.applications.getCurrent().then((app) => app.approximate_guild_count ?? 0), guilds: 0, members: new Map(), emojis: new Map(), @@ -81,10 +77,4 @@ export const cache: { stickers: new Map(), sounds: new Map(), user: null, - requestingGuild: null, - requestGuildQueue: new Set(), - requestingPins: null, - requestPinsQueue: new Set(), - requestPinsGuildMap: new Map(), - requestPinsPaused: false, }; diff --git a/src/BaseClient/Bot/Events/ready.ts b/src/BaseClient/Bot/Events/ready.ts index 9f683a8..3eff28e 100644 --- a/src/BaseClient/Bot/Events/ready.ts +++ b/src/BaseClient/Bot/Events/ready.ts @@ -8,6 +8,7 @@ import { import { getInfo } from 'discord-hybrid-sharding'; import { scheduleJob } from 'node-schedule'; +import { priorityQueue } from '../../../Util/PriorityQueue/index.js'; import redis from '../Cache.js'; import { cache, client, cluster, gateway } from '../Client.js'; @@ -27,6 +28,8 @@ export default async (data: GatewayReadyDispatchData, shardId: number | string) .SHARD_LIST.map((shard) => shard + 1) .join(', ')}`, ); + + priorityQueue.initializeColdStartDetection(); } console.log(`[Ready | Shard ${shardId}]`); diff --git a/src/BaseClient/Cluster/Manager.ts b/src/BaseClient/Cluster/Manager.ts index 19e5582..15baf5d 100644 --- a/src/BaseClient/Cluster/Manager.ts +++ b/src/BaseClient/Cluster/Manager.ts @@ -9,31 +9,19 @@ const manager = new ClusterManager('./dist/bot.js', { shardsPerClusters: 10, token: (process.argv.includes('--dev') ? process.env.DevToken : process.env.Token) ?? '', shardArgs: process.argv, - execArgv: [ - '--max-old-space-size=512', - '--experimental-json-modules', - '--inspect=0.0.0.0:9229', - ...(process.argv.includes('--dev') ? [] : ['--no-deprecation', '--no-warnings']), - ], + execArgv: [], respawn: true, mode: 'process', }); -await manager - .spawn() - .then(() => { - setInterval(async () => { - await manager.broadcastEval('this.status && this.isReady() ? this.reconnect() : 0'); - }, 60000); - }) - .catch((e: Response) => { - console.log( - `[Cluster Manager] Startup Failed. Retry after: ${ - Number(e.headers?.get('retry-after') ?? 0) / 60 - } Minutes`, - ); - console.error(e); - process.exit(1); - }); +await manager.spawn().catch((e: Response) => { + console.log( + `[Cluster Manager] Startup Failed. Retry after: ${ + Number(e.headers?.get('retry-after') ?? 0) / 60 + } Minutes`, + ); + console.error(e); + process.exit(1); +}); export default manager; diff --git a/src/BaseClient/Cluster/Redis.ts b/src/BaseClient/Cluster/Redis.ts index 420cb1e..6ec15f6 100644 --- a/src/BaseClient/Cluster/Redis.ts +++ b/src/BaseClient/Cluster/Redis.ts @@ -1,11 +1,11 @@ -import Redis from 'ioredis'; +import { createRedisWrapper } from '@ayako/utility'; const cacheDBnum = process.argv.includes('--dev') ? process.env.devCacheDB : process.env.cacheDB; if (!cacheDBnum && typeof cacheDBnum !== 'number') { throw new Error('No cache DB number provided in env vars'); } -export const cacheDB = new Redis({ +export const cacheDB = createRedisWrapper({ host: process.argv.includes('--local') ? 'localhost' : 'redis', db: Number(cacheDBnum), }); diff --git a/src/Util/EventBus.ts b/src/Util/EventBus.ts index 7e5ec28..f2ee4f0 100644 --- a/src/Util/EventBus.ts +++ b/src/Util/EventBus.ts @@ -1,12 +1,19 @@ +import type { Cache } from '@ayako/utility'; import type { GatewayDispatchEvents, GatewayDispatchPayload } from 'discord-api-types/gateway/v10'; -import cache from '../BaseClient/Bot/Cache.js'; +import { priorityQueue } from './PriorityQueue/index.js'; -const emit = (type: GatewayDispatchEvents, data: GatewayDispatchPayload['d']) => { - // eslint-disable-next-line no-console - if (process.argv.includes('--debug')) console.log(`[EventBus] Emitting event: ${type}`); +export default function ( + this: Cache, + type: GatewayDispatchEvents, + data: GatewayDispatchPayload['d'], +) { + if (priorityQueue.isColdStart) { + this.logger.debug(`[EventBus] Skipping event during cold start: ${type}`); + return; + } - cache.cacheDb.publish(type, JSON.stringify(data)); -}; + this.logger.debug(`[EventBus] Emitting event: ${type}`); -export default emit; + this.cachePub.publish(type, JSON.stringify(data)); +} diff --git a/src/Util/PriorityQueue/BinaryHeap.ts b/src/Util/PriorityQueue/BinaryHeap.ts new file mode 100644 index 0000000..9b97930 --- /dev/null +++ b/src/Util/PriorityQueue/BinaryHeap.ts @@ -0,0 +1,148 @@ +/** + * Generic Binary Heap (Max-Heap by default) + * O(log n) insertion and extraction + */ +export class BinaryHeap { + private heap: T[] = []; + private comparator: (a: T, b: T) => number; + + /** + * Creates a new binary heap + * @param comparator Function that returns: + * - negative if a should come before b (higher priority) + * - positive if b should come before a (higher priority) + * - zero if equal priority + */ + constructor(comparator: (a: T, b: T) => number) { + this.comparator = comparator; + } + + /** + * Number of items in the heap + */ + get size(): number { + return this.heap.length; + } + + /** + * Check if heap is empty + */ + get isEmpty(): boolean { + return this.heap.length === 0; + } + + /** + * Insert an item into the heap + * O(log n) + */ + push(item: T): void { + this.heap.push(item); + this.bubbleUp(this.heap.length - 1); + } + + /** + * Remove and return the highest priority item + * O(log n) + */ + pop(): T | undefined { + if (this.heap.length === 0) return undefined; + if (this.heap.length === 1) return this.heap.pop(); + + const [top] = this.heap; + this.heap[0] = this.heap.pop()!; + this.bubbleDown(0); + return top; + } + + /** + * Peek at the highest priority item without removing it + * O(1) + */ + peek(): T | undefined { + return this.heap[0]; + } + + /** + * Clear all items from the heap + */ + clear(): void { + this.heap = []; + } + + /** + * Get all items (for debugging/inspection) + */ + toArray(): T[] { + return [...this.heap]; + } + + /** + * Remove an item that matches the predicate + * O(n) - use sparingly + */ + remove(predicate: (item: T) => boolean): T | undefined { + const index = this.heap.findIndex(predicate); + if (index === -1) return undefined; + + const item = this.heap[index]; + if (index === this.heap.length - 1) { + this.heap.pop(); + } else { + this.heap[index] = this.heap.pop()!; + // May need to bubble up or down depending on new item + const parent = Math.floor((index - 1) / 2); + if (index > 0 && this.comparator(this.heap[index], this.heap[parent]) < 0) { + this.bubbleUp(index); + } else { + this.bubbleDown(index); + } + } + return item; + } + + /** + * Check if an item matching predicate exists + * O(n) + */ + has(predicate: (item: T) => boolean): boolean { + return this.heap.some(predicate); + } + + /** + * Move item up the heap until heap property is satisfied + */ + private bubbleUp(index: number): void { + while (index > 0) { + const parent = Math.floor((index - 1) / 2); + if (this.comparator(this.heap[index], this.heap[parent]) >= 0) break; + + [this.heap[index], this.heap[parent]] = [this.heap[parent], this.heap[index]]; + index = parent; + } + } + + /** + * Move item down the heap until heap property is satisfied + */ + private bubbleDown(index: number): void { + const { length } = this.heap; + + while (true) { + const left = 2 * index + 1; + const right = 2 * index + 2; + let smallest = index; + + if (left < length && this.comparator(this.heap[left], this.heap[smallest]) < 0) { + smallest = left; + } + if (right < length && this.comparator(this.heap[right], this.heap[smallest]) < 0) { + smallest = right; + } + + if (smallest === index) break; + + [this.heap[index], this.heap[smallest]] = [this.heap[smallest], this.heap[index]]; + index = smallest; + } + } +} diff --git a/src/Util/PriorityQueue/ColdStartDetector.ts b/src/Util/PriorityQueue/ColdStartDetector.ts new file mode 100644 index 0000000..e563d00 --- /dev/null +++ b/src/Util/PriorityQueue/ColdStartDetector.ts @@ -0,0 +1,133 @@ +/** + * Cold Start Detector + * + * Detects when the bot is in a "cold start" state after FLUSHDB or fresh deployment. + * During cold start, event emission to Redis is suppressed to prevent queue overload + * and events being sent with major delay due to paused queue. + * + * Cold start is detected when: + * - guild-interacts, channel-interacts, hashes + * have fewer than 10% of the bot's guild count entries + * + * Cold start ends when: + * - Redis queue size drops below 5000 + */ +import redis from '../../BaseClient/Bot/Cache.js'; +import { cache as clientCache } from '../../BaseClient/Bot/Client.js'; + +class ColdStartDetector { + private isColdstart = false; + private isInitialized = false; + private checkInterval: ReturnType | null = null; + + protected coldStartThresholdPercentage = 0.1; + protected coldStartEndQueueSize = 5000; + protected checkIntervalTime = 1000; + + /** + * Whether the bot is currently in cold start mode + */ + get isColdStart(): boolean { + return this.isColdstart; + } + + /** + * Initialize cold start detection + * Call this after the bot has received guild count from Discord + */ + async initialize(): Promise { + if (this.isInitialized) return; + this.isInitialized = true; + + const guildCount = clientCache.approxGuilds; + if (guildCount === 0) { + this.isColdstart = true; + // eslint-disable-next-line no-console + console.log('[ColdStart] Guild count is 0, assuming cold start'); + this.startEndCheck(); + return; + } + + const threshold = Math.floor(guildCount * this.coldStartThresholdPercentage); + + const guildInteractsSize = (await redis.cacheDb.call('HLEN', 'guild-interacts')) as number; + const isCold = guildInteractsSize < threshold; + + if (isCold) { + this.isColdstart = true; + // eslint-disable-next-line no-console + console.log( + `[ColdStart] Detected cold start | Guilds: ${guildCount} | Threshold: ${threshold} | ` + + `guild-interacts: ${guildInteractsSize} | `, + ); + + this.startEndCheck(); + } else { + // eslint-disable-next-line no-console + console.log( + `[ColdStart] Normal start detected | Guilds: ${guildCount} | ` + + `guild-interacts: ${guildInteractsSize} | `, + ); + } + } + + /** + * Start checking if cold start has ended + */ + private startEndCheck(): void { + if (this.checkInterval) return; + + this.checkInterval = setInterval(() => { + this.checkColdStartEnd(); + }, this.checkIntervalTime); + } + + /** + * Check if cold start should end based on Redis queue size + */ + private checkColdStartEnd(): void { + const queueSize = redis.cacheDb.getQueueSize(); + + if (queueSize < this.coldStartEndQueueSize) { + this.isColdstart = false; + + if (this.checkInterval) { + clearInterval(this.checkInterval); + this.checkInterval = null; + } + + // eslint-disable-next-line no-console + console.log(`[ColdStart] Cold start ended | Redis queue size: ${queueSize}`); + } + } + + /** + * Manually end cold start (for testing or manual override) + */ + endColdStart(): void { + this.isColdstart = false; + + if (this.checkInterval) { + clearInterval(this.checkInterval); + this.checkInterval = null; + } + + // eslint-disable-next-line no-console + console.log('[ColdStart] Cold start manually ended'); + } + + /** + * Reset detector state (for testing) + */ + reset(): void { + this.isColdstart = false; + this.isInitialized = false; + + if (this.checkInterval) { + clearInterval(this.checkInterval); + this.checkInterval = null; + } + } +} + +export const coldStartDetector = new ColdStartDetector(); diff --git a/src/Util/PriorityQueue/GatewayQueue.ts b/src/Util/PriorityQueue/GatewayQueue.ts new file mode 100644 index 0000000..09264a9 --- /dev/null +++ b/src/Util/PriorityQueue/GatewayQueue.ts @@ -0,0 +1,172 @@ +/** + * Gateway Queue - Handles member chunk requests via Gateway opcode 8 + * + * - Single request at a time (must wait for all chunks) + * - Priority by member count (larger guilds first) + * - Notified when chunks complete via onChunkComplete() + */ +import { getRandom } from '@ayako/utility'; +import { GatewayOpcodes } from 'discord-api-types/gateway/v10'; + +import RedisCache from '../../BaseClient/Bot/Cache.js'; +import { gateway } from '../../BaseClient/Bot/Client.js'; +import calculateShardId from '../calculateShardId.js'; + +import { BinaryHeap } from './BinaryHeap.js'; +import { CONFIG, type GatewayQueueItem } from './types.js'; + +/** + * Priority comparator for gateway queue items + * Higher member count = higher priority (comes first) + * Equal member count: FIFO by addedAt + */ +const gatewayComparator = (a: GatewayQueueItem, b: GatewayQueueItem): number => { + if (a.memberCount !== b.memberCount) return b.memberCount - a.memberCount; + return a.addedAt - b.addedAt; +}; + +class GatewayQueue { + private queue = new BinaryHeap(gatewayComparator); + private currentGuildId: string | null = null; + private processingInterval: ReturnType | null = null; + private isProcessing = false; + private chunkTimeout: ReturnType | null = null; + private static readonly chunkTimeoutMS = 30000; + + /** + * Get the currently processing guild ID + */ + get currentGuild(): string | null { + return this.currentGuildId; + } + + /** + * Get the number of items in the queue + */ + get size(): number { + return this.queue.size; + } + + /** + * Start the queue processing interval + */ + start(): void { + if (this.processingInterval) return; + this.processingInterval = setInterval(() => { + this.process().catch((err) => { + // eslint-disable-next-line no-console + console.error('[GatewayQueue] Error processing queue:', err); + }); + }, CONFIG.GATEWAY_INTERVAL); + // eslint-disable-next-line no-console + console.log('[GatewayQueue] Started'); + } + + /** + * Stop the queue processing interval + */ + stop(): void { + if (this.processingInterval) { + clearInterval(this.processingInterval); + this.processingInterval = null; + } + + if (this.chunkTimeout) { + clearTimeout(this.chunkTimeout); + this.chunkTimeout = null; + } + + // eslint-disable-next-line no-console + console.log('[GatewayQueue] Stopped'); + } + + /** + * Add a guild to the queue for member chunk request + * @param guildId Guild ID + * @param memberCount Approximate member count for priority + */ + async enqueue(guildId: string, memberCount: number): Promise { + if (memberCount === 0) return; + + if (this.currentGuildId === guildId) return; + if (this.queue.has((item) => item.guildId === guildId)) return; + + const [alreadyRequested] = await RedisCache.execPipeline<[string | null]>((pipeline) => { + pipeline.hget('guild-members-requested', guildId); + pipeline.hset('guild-members-requested', guildId, '1'); + pipeline.call( + 'hexpire', + 'guild-members-requested', + getRandom(604800 / 2, 604800), + 'NX', + 'FIELDS', + 1, + guildId, + ); + }); + if (alreadyRequested === '1') return; + + this.queue.push({ + type: 'gateway', + guildId, + memberCount, + addedAt: Date.now(), + }); + } + + /** + * Called when all member chunks have been received for a guild + * This unblocks the queue to process the next guild + */ + onChunkComplete(guildId: string): void { + if (this.currentGuildId === guildId) { + this.currentGuildId = null; + + if (this.chunkTimeout) { + clearTimeout(this.chunkTimeout); + this.chunkTimeout = null; + } + } + } + + /** + * Process the next item in the queue + */ + private async process(): Promise { + if (this.isProcessing) return; + if (this.currentGuildId !== null) return; + if (this.queue.isEmpty) return; + + this.isProcessing = true; + + try { + const item = this.queue.pop(); + if (!item) return; + + this.currentGuildId = item.guildId; + + gateway.send(calculateShardId(item.guildId), { + op: GatewayOpcodes.RequestGuildMembers, + d: { guild_id: item.guildId, presences: false, limit: 0, query: '' }, + }); + + this.chunkTimeout = setTimeout(() => { + if (this.currentGuildId === item.guildId) { + // eslint-disable-next-line no-console + console.log(`[GatewayQueue] Timeout waiting for chunks from ${item.guildId}, skipping`); + this.currentGuildId = null; + this.chunkTimeout = null; + } + }, GatewayQueue.chunkTimeoutMS); + + // eslint-disable-next-line no-console + console.log( + `[GatewayQueue] Requested members for ${item.guildId} (${item.memberCount} members) | Queue: ${this.queue.size}`, + ); + } finally { + this.isProcessing = false; + } + } +} + +export const gatewayQueue = new GatewayQueue(); diff --git a/src/Util/PriorityQueue/RestQueue.ts b/src/Util/PriorityQueue/RestQueue.ts new file mode 100644 index 0000000..bdd443c --- /dev/null +++ b/src/Util/PriorityQueue/RestQueue.ts @@ -0,0 +1,595 @@ +/** + * REST Queue - Handles all Discord REST API calls with rate limiting + * + * - Max 5 concurrent requests (configurable) + * - Per-endpoint rate limit tracking + * - Re-queues on 429 with retry_after delay + * - Priority by member count (larger guilds first) + */ +import { getGuildPerms } from '@ayako/utility'; +import { RESTEvents } from '@discordjs/rest'; +import { GuildFeature, PermissionFlagsBits } from 'discord-api-types/v10'; + +import redis from '../../BaseClient/Bot/Cache.js'; +import { api, cache as clientCache } from '../../BaseClient/Bot/Client.js'; +import requestEventSubscribers from '../requestEventSubscribers.js'; +import requestVoiceChannelStatuses from '../requestVoiceChannelStatuses.js'; + +import { BinaryHeap } from './BinaryHeap.js'; +import { CONFIG, type BucketState, type GuildTaskName, type RestQueueItem } from './types.js'; + +/** + * Priority comparator for REST queue items + * Higher member count = higher priority + * Equal member count: FIFO by addedAt + */ +const restComparator = (a: RestQueueItem, b: RestQueueItem): number => { + if (a.memberCount !== b.memberCount) return b.memberCount - a.memberCount; + return a.addedAt - b.addedAt; +}; + +class RestQueue { + private queue = new BinaryHeap(restComparator); + private activeRequests = 0; + private processingInterval: ReturnType | null = null; + private isProcessing = false; + private completedCount = 0; + private inFlight = new Set(); + + private buckets = new Map(); + private routeToBucket = new Map(); + + /** + * Get the number of items in the queue + */ + get size(): number { + return this.queue.size; + } + + /** + * Get the number of active requests + */ + get active(): number { + return this.activeRequests; + } + + /** + * Start the queue processing interval + */ + start(): void { + if (this.processingInterval) return; + + this.setupRateLimitListener(); + + this.processingInterval = setInterval(() => { + this.process().catch((err) => { + // eslint-disable-next-line no-console + console.error('[RestQueue] Error processing queue:', err); + }); + }, CONFIG.REST_INTERVAL); + // eslint-disable-next-line no-console + console.log('[RestQueue] Started'); + } + + /** + * Stop the queue processing interval + */ + stop(): void { + if (this.processingInterval) { + clearInterval(this.processingInterval); + this.processingInterval = null; + // eslint-disable-next-line no-console + console.log('[RestQueue] Stopped'); + } + } + + /** + * Generate a unique key for a task (used for in-flight tracking) + */ + private getTaskKey(id: string, taskName: string): string { + return `guild:${id}:${taskName}`; + } + + /** + * Check if a guild task is already queued or in-flight + */ + private hasGuildTask(guildId: string, taskName?: string): boolean { + if (taskName) { + if (this.inFlight.has(this.getTaskKey(guildId, taskName))) return true; + } else { + for (const key of this.inFlight) { + if (key.startsWith(`guild:${guildId}:`)) return true; + } + } + return this.queue.has( + (item) => item.guildId === guildId && (taskName ? item.taskName === taskName : true), + ); + } + + //#region Bucket-Based Rate Limiting + + /** + * Normalize an endpoint to a route pattern for bucket grouping + * Converts: "channels/123456789012345678/pins" -> "channels/:id/pins" + */ + private normalizeRoute(endpoint: string): string { + return endpoint + .replace(/\d{17,19}/g, ':id') + .replace(/\/reactions\/(.*)/, '/reactions/:reaction') + .replace(/\/webhooks\/:id\/[^/?]+/, '/webhooks/:id/:token'); + } + + /** + * Generate a bucket key combining method and normalized route + */ + private generateBucketKey(method: string, endpoint: string): string { + return `${method}:${this.normalizeRoute(endpoint)}`; + } + + /** + * Check if an endpoint is currently rate limited based on bucket + */ + private isEndpointBlocked(method: string, endpoint: string): boolean { + const bucketKey = this.generateBucketKey(method, endpoint); + const bucketHash = this.routeToBucket.get(bucketKey); + + if (!bucketHash) return false; // Unknown bucket - allow request + + const bucket = this.buckets.get(bucketHash); + if (!bucket) return false; + + const now = Date.now(); + + // Check if bucket has reset + if (now >= bucket.resetAt) { + bucket.blocked = false; + bucket.remaining = bucket.limit; + return false; + } + + return bucket.blocked; + } + + /** + * Setup listener for REST rate limit events via Response event + */ + private setupRateLimitListener(): void { + api.rest.on(RESTEvents.Response, (request, response) => { + if (response.status !== 429) return; + + const retryAfter = response.headers.get('retry-after'); + const bucket = response.headers.get('x-ratelimit-bucket'); + const scope = response.headers.get('x-ratelimit-scope') as 'user' | 'global' | 'shared' | null; + + if (!retryAfter) return; + + const retryAfterMs = parseFloat(retryAfter) * 1000; + const route = this.normalizeRoute(request.path); + const bucketKey = this.generateBucketKey(request.method, request.path); + const bucketHash = bucket ?? `unknown:${route}`; + + this.routeToBucket.set(bucketKey, bucketHash); + this.buckets.set(bucketHash, { + bucketHash, + route, + method: request.method, + remaining: 0, + limit: 1, + resetAt: Date.now() + retryAfterMs, + blocked: true, + scope: scope ?? 'user', + }); + + // eslint-disable-next-line no-console + console.log( + `[RestQueue] Bucket learned from 429: ${route} -> ${bucketHash} (retry in ${retryAfterMs}ms)`, + ); + }); + } + + //#endregion + + /** + * Enqueue all guild tasks for first guild interaction + */ + enqueueGuildTasks(guildId: string, memberCount: number): void { + if (this.hasGuildTask(guildId)) return; + + const now = Date.now(); + const guildTasks: GuildTaskName[] = [ + 'vcStatus', + 'autoModRules', + 'commands', + 'commandPermissions', + 'welcomeScreen', + 'onboarding', + 'scheduledEvents', + 'webhooks', + 'integrations', + 'invites', + ]; + + for (const taskName of guildTasks) { + this.queue.push({ + type: 'guild', + id: guildId, + guildId, + memberCount, + taskName, + endpoint: `guilds/${guildId}/${taskName}`, + addedAt: now, + }); + } + + // eslint-disable-next-line no-console + console.log( + `[RestQueue] Enqueued ${guildTasks.length} guild tasks for guilds/${guildId}/* | Queue: ${this.queue.size}`, + ); + } + + /** + * Enqueue a single guild task (for subsequent updates, not first interaction) + */ + enqueueGuildTask(guildId: string, memberCount: number, taskName: GuildTaskName): void { + if (this.hasGuildTask(guildId, taskName)) return; + + const endpoint = `guilds/${guildId}/${taskName}`; + this.queue.push({ + type: 'guild', + id: guildId, + guildId, + memberCount, + taskName, + endpoint, + addedAt: Date.now(), + }); + + // eslint-disable-next-line no-console + console.log(`[RestQueue] Enqueued ${endpoint} | Queue: ${this.queue.size}`); + } + + /** + * Process items from the queue + */ + private async process(): Promise { + if (this.isProcessing) return; + if (this.activeRequests >= CONFIG.REST_MAX_CONCURRENT) return; + if (this.queue.isEmpty) return; + + const redisQueueSize = redis.cacheDb.getQueueSize(); + if (redisQueueSize > CONFIG.REDIS_QUEUE_THRESHOLD) { + // eslint-disable-next-line no-console + console.log( + `[RestQueue] Backpressure: Redis queue ${redisQueueSize} > threshold ${CONFIG.REDIS_QUEUE_THRESHOLD}, pausing`, + ); + return; + } + + this.isProcessing = true; + + try { + while (this.activeRequests < CONFIG.REST_MAX_CONCURRENT && !this.queue.isEmpty) { + const currentQueueSize = redis.cacheDb.getQueueSize(); + if (currentQueueSize > CONFIG.REDIS_QUEUE_THRESHOLD) { + // eslint-disable-next-line no-console + console.log(`[RestQueue] Backpressure mid-loop: Redis queue ${currentQueueSize}, stopping`); + break; + } + + const item = this.findNextUnblockedItem(); + if (!item) break; + + this.executeTask(item).catch((err) => { + // eslint-disable-next-line no-console + console.error(`[RestQueue] Task ${item.taskName} failed:`, err); + }); + } + } finally { + this.isProcessing = false; + } + } + + /** + * Find the next item that isn't rate limited (using bucket-based checking) + */ + private findNextUnblockedItem(): RestQueueItem | undefined { + const items = this.queue.toArray(); + + for (const item of items) { + if (!this.isEndpointBlocked('GET', item.endpoint)) { + this.queue.remove((it) => it === item); + return item; + } + } + + return undefined; + } + + /** + * Handle rate limit (429) response - re-queue the item + */ + private onRateLimit(item: RestQueueItem, retryAfter: number): void { + this.queue.push(item); + + // eslint-disable-next-line no-console + console.log( + `[RestQueue] Rate limited on ${item.endpoint}, retry after ${retryAfter}ms | Queue: ${this.queue.size}`, + ); + } + + /** + * Execute a task with timeout + */ + private async executeTask(item: RestQueueItem): Promise { + const taskKey = this.getTaskKey(item.id, item.taskName); + this.inFlight.add(taskKey); + this.activeRequests++; + + try { + await Promise.race([ + this.executeGuildTask(item), + new Promise((_, reject) => + setTimeout(() => reject(new Error('Task timeout')), CONFIG.TASK_TIMEOUT), + ), + ]); + + this.completedCount++; + if (this.completedCount % 10 === 0) { + // eslint-disable-next-line no-console + console.log( + `[RestQueue] Completed ${this.completedCount} requests | Queue: ${this.queue.size} | Active: ${this.activeRequests}`, + ); + } + } catch (error: unknown) { + if (this.isRateLimitError(error)) { + const retryAfter = this.getRateLimitRetryAfter(error); + this.onRateLimit(item, retryAfter); + } else if (error instanceof Error && error.message === 'Task timeout') { + // eslint-disable-next-line no-console + console.log(`[RestQueue] Task timeout: ${item.endpoint} - re-queuing`); + this.queue.push(item); + } + } finally { + this.inFlight.delete(taskKey); + this.activeRequests--; + } + } + + /** + * Execute a guild task + */ + private async executeGuildTask(item: RestQueueItem): Promise { + const { guildId } = item; + + switch (item.taskName) { + case 'vcStatus': + await requestVoiceChannelStatuses(guildId); + break; + + case 'autoModRules': + await this.taskAutoModRules(guildId); + break; + + case 'commands': + await this.taskCommands(guildId); + break; + + case 'commandPermissions': + await this.taskCommandPermissions(guildId); + break; + + case 'welcomeScreen': + await this.taskWelcomeScreen(guildId); + break; + + case 'onboarding': + await this.taskOnboarding(guildId); + break; + + case 'scheduledEvents': + await this.taskScheduledEvents(guildId); + break; + + case 'webhooks': + await this.taskWebhooks(guildId); + break; + + case 'integrations': + await this.taskIntegrations(guildId); + break; + + case 'invites': + await this.taskInvites(guildId); + break; + + default: + break; + } + } + + //#region Guild Tasks + + private async taskAutoModRules(guildId: string): Promise { + const perms = await getGuildPerms.call(redis, guildId, clientCache.user?.id || '0'); + if ((perms.response & PermissionFlagsBits.ManageGuild) !== PermissionFlagsBits.ManageGuild) { + return; + } + + const keystoreKey = redis.automods.keystore(guildId); + const keys = await redis.cacheDb.hkeys(keystoreKey); + if (keys.length > 0) await redis.cacheDb.del(...keys, keystoreKey); + const rules = await api.guilds.getAutoModerationRules(guildId).catch(() => []); + rules.forEach((r) => redis.automods.set(r)); + } + + private async taskCommands(guildId: string): Promise { + if (!clientCache.user) return; + + const keystoreKey = redis.commands.keystore(guildId); + const keys = await redis.cacheDb.hkeys(keystoreKey); + if (keys.length > 0) await redis.cacheDb.del(...keys, keystoreKey); + + const commands = await api.applicationCommands + .getGuildCommands(clientCache.user.id, guildId) + .catch(() => []); + commands.forEach((c) => redis.guildCommands.set({ ...c, guild_id: guildId })); + } + + private async taskCommandPermissions(guildId: string): Promise { + if (!clientCache.user) return; + + const keystoreKey = redis.commandPermissions.keystore(guildId); + const keys = await redis.cacheDb.hkeys(keystoreKey); + if (keys.length > 0) await redis.cacheDb.del(...keys, keystoreKey); + + const commandPerms = await api.applicationCommands + .getGuildCommandsPermissions(clientCache.user.id, guildId) + .catch(() => []); + + commandPerms.forEach((command) => + command.permissions.forEach((perm) => redis.commandPermissions.set(perm, guildId, command.id)), + ); + } + + private async taskWelcomeScreen(guildId: string): Promise { + const guild = await redis.guilds.get(guildId); + if (!guild) return; + + if (!guild.features.includes(GuildFeature.WelcomeScreenEnabled)) { + const perms = await getGuildPerms.call(redis, guildId, clientCache.user?.id || '0'); + if ((perms.response & PermissionFlagsBits.ManageGuild) !== PermissionFlagsBits.ManageGuild) { + return; + } + } + + const keystoreKey = redis.welcomeScreens.keystore(guildId); + const keys = await redis.cacheDb.hkeys(keystoreKey); + if (keys.length > 0) await redis.cacheDb.del(...keys, keystoreKey); + + const welcomeScreen = await api.guilds.getWelcomeScreen(guildId).catch(() => null); + if (!welcomeScreen) return; + + redis.welcomeScreens.set(welcomeScreen, guildId); + } + + private async taskOnboarding(guildId: string): Promise { + const guild = await redis.guilds.get(guildId); + if (!guild) return; + + const perms = await getGuildPerms.call(redis, guildId, clientCache.user?.id || '0'); + if ((perms.response & PermissionFlagsBits.ManageGuild) !== PermissionFlagsBits.ManageGuild) { + return; + } + + const onboarding = await api.guilds.getOnboarding(guildId); + redis.onboardings.set(onboarding); + } + + private async taskScheduledEvents(guildId: string): Promise { + const keystoreKey = redis.events.keystore(guildId); + const keys = await redis.cacheDb.hkeys(keystoreKey); + if (keys.length > 0) await redis.cacheDb.del(...keys, keystoreKey); + + const scheduledEvents = await api.guilds + .getScheduledEvents(guildId, { with_user_count: true }) + .catch(() => []); + scheduledEvents.forEach((e) => redis.events.set(e)); + + const members = ( + await Promise.all(scheduledEvents.map((e) => requestEventSubscribers(e))) + ).flat(); + + members.forEach((u) => { + redis.users.set(u.user); + redis.eventUsers.set( + { + guild_id: guildId, + guild_scheduled_event_id: u.guildScheduledEventId, + user: u.user, + user_id: u.user.id, + member: u.member, + }, + guildId, + ); + + if (u.member) redis.members.set(u.member, guildId); + }); + } + + private async taskWebhooks(guildId: string): Promise { + const perms = await getGuildPerms.call(redis, guildId, clientCache.user?.id || '0'); + if ( + (perms.response & PermissionFlagsBits.ManageWebhooks) !== + PermissionFlagsBits.ManageWebhooks + ) { + return; + } + + const keystoreKey = redis.webhooks.keystore(guildId); + const keys = await redis.cacheDb.hkeys(keystoreKey); + if (keys.length > 0) await redis.cacheDb.del(...keys, keystoreKey); + + const webhooks = await api.guilds.getWebhooks(guildId).catch(() => []); + webhooks.forEach((w) => redis.webhooks.set(w)); + } + + private async taskIntegrations(guildId: string): Promise { + const perms = await getGuildPerms.call(redis, guildId, clientCache.user?.id || '0'); + if ((perms.response & PermissionFlagsBits.ManageGuild) !== PermissionFlagsBits.ManageGuild) { + return; + } + + const keystoreKey = redis.integrations.keystore(guildId); + const keys = await redis.cacheDb.hkeys(keystoreKey); + if (keys.length > 0) await redis.cacheDb.del(...keys, keystoreKey); + + const integrations = await api.guilds.getIntegrations(guildId).catch(() => []); + integrations.forEach((i) => redis.integrations.set(i, guildId)); + } + + private async taskInvites(guildId: string): Promise { + const perms = await getGuildPerms.call(redis, guildId, clientCache.user?.id || '0'); + if ((perms.response & PermissionFlagsBits.ManageGuild) !== PermissionFlagsBits.ManageGuild) { + if ((perms.response & PermissionFlagsBits.ViewAuditLog) !== PermissionFlagsBits.ViewAuditLog) { + return; + } + } + + const keystoreKey = redis.invites.keystore(guildId); + const keys = await redis.cacheDb.hkeys(keystoreKey); + const guildCodestoreKey = redis.invites.codestore(guildId); + const globalCodestoreKey = redis.invites.codestore(); + + const codes = await redis.cacheDb.hkeys(guildCodestoreKey); + + if (keys.length > 0) await redis.cacheDb.del(...keys, keystoreKey, guildCodestoreKey); + if (codes.length > 0) await redis.cacheDb.hdel(globalCodestoreKey, ...codes); + + const invites = await api.guilds.getInvites(guildId).catch(() => []); + invites.forEach((i) => redis.invites.set(i)); + } + + //#region Utilities + + private isRateLimitError(error: unknown): boolean { + if (error && typeof error === 'object' && 'status' in error) { + return (error as { status: number }).status === 429; + } + return false; + } + + private getRateLimitRetryAfter(error: unknown): number { + if (error && typeof error === 'object') { + if ('rawError' in error) { + // eslint-disable-next-line @typescript-eslint/naming-convention + const { rawError } = error as { rawError: { retry_after?: number } }; + if (rawError?.retry_after) { + return Math.ceil(rawError.retry_after * 1000); + } + } + } + return CONFIG.DEFAULT_RETRY_AFTER; + } +} + +export const restQueue = new RestQueue(); diff --git a/src/Util/PriorityQueue/index.ts b/src/Util/PriorityQueue/index.ts new file mode 100644 index 0000000..58405e8 --- /dev/null +++ b/src/Util/PriorityQueue/index.ts @@ -0,0 +1,122 @@ +/** + * Priority Queue System + * + * Provides throttled request handling to prevent Discord API rate limits + * after Redis FLUSHDB or cold starts. + * + * Usage: + * ```ts + * import { priorityQueue } from './PriorityQueue/index.js'; + * + * // Start on bot initialization + * priorityQueue.start(); + * + * // Enqueue requests + * priorityQueue.enqueueGuildTasks(guildId, memberCount); + * priorityQueue.enqueueMembers(guildId, memberCount); + * + * // Notify when member chunks complete + * priorityQueue.onMemberChunkComplete(guildId); + * ``` + */ + +import { coldStartDetector } from './ColdStartDetector.js'; +import { gatewayQueue } from './GatewayQueue.js'; +import { restQueue } from './RestQueue.js'; +import type { GuildTaskName } from './types.js'; + +export * from './types.js'; +export { BinaryHeap } from './BinaryHeap.js'; +export { coldStartDetector } from './ColdStartDetector.js'; +export { gatewayQueue } from './GatewayQueue.js'; +export { restQueue } from './RestQueue.js'; + +/** + * Unified interface for the priority queue system + */ +export const priorityQueue = { + /** + * Start all queue processors + */ + start(): void { + gatewayQueue.start(); + restQueue.start(); + }, + + /** + * Stop all queue processors + */ + stop(): void { + gatewayQueue.stop(); + restQueue.stop(); + }, + + /** + * Enqueue all guild tasks for first guild interaction + * @param guildId Guild ID + * @param memberCount Approximate member count for priority + */ + enqueueGuildTasks(guildId: string, memberCount: number): void { + restQueue.enqueueGuildTasks(guildId, memberCount); + }, + + /** + * Enqueue a single guild task (for subsequent updates, not first interaction) + * @param guildId Guild ID + * @param memberCount Approximate member count for priority + * @param taskName The specific task to enqueue + */ + enqueueGuildTask(guildId: string, memberCount: number, taskName: GuildTaskName): void { + restQueue.enqueueGuildTask(guildId, memberCount, taskName); + }, + + /** + * Enqueue member chunk request for a guild + * @param guildId Guild ID + * @param memberCount Approximate member count for priority + */ + async enqueueMembers(guildId: string, memberCount: number): Promise { + await gatewayQueue.enqueue(guildId, memberCount); + }, + + /** + * Notify that member chunks have completed for a guild + * @param guildId Guild ID + */ + onMemberChunkComplete(guildId: string): void { + gatewayQueue.onChunkComplete(guildId); + }, + + /** + * Get current queue sizes (for monitoring) + */ + getStats(): { gatewayQueue: number; restQueue: number; activeRestRequests: number } { + return { + gatewayQueue: gatewayQueue.size, + restQueue: restQueue.size, + activeRestRequests: restQueue.active, + }; + }, + + /** + * Get the guild currently waiting for member chunks + */ + get currentMemberRequestGuild(): string | null { + return gatewayQueue.currentGuild; + }, + + /** + * Whether the bot is in cold start mode (events should not be emitted) + */ + get isColdStart(): boolean { + return coldStartDetector.isColdStart; + }, + + /** + * Initialize cold start detection + * Call after guild count is known + */ + async initializeColdStartDetection(): Promise { + await coldStartDetector.initialize(); + }, +}; diff --git a/src/Util/PriorityQueue/types.ts b/src/Util/PriorityQueue/types.ts new file mode 100644 index 0000000..10caedc --- /dev/null +++ b/src/Util/PriorityQueue/types.ts @@ -0,0 +1,83 @@ +/** + * Types for Priority Queue system + * Used to throttle Discord API requests after FLUSHDB + */ + +/** + * Item in the Gateway queue (member chunk requests) + */ +export type GatewayQueueItem = { + type: 'gateway'; + guildId: string; + memberCount: number; + addedAt: number; +}; + +/** + * Task names for REST API calls on first guild interaction + */ +export type GuildTaskName = + | 'autoModRules' + | 'commands' + | 'commandPermissions' + | 'welcomeScreen' + | 'onboarding' + | 'scheduledEvents' + | 'webhooks' + | 'integrations' + | 'invites' + | 'vcStatus'; + +/** + * Item in the REST queue + */ +export type RestQueueItem = { + type: 'guild'; + id: string; + guildId: string; + memberCount: number; + taskName: GuildTaskName; + endpoint: string; + addedAt: number; +}; + +/** + * Rate limit bucket from Discord + * Multiple routes can share the same bucket (identified by bucketHash) + */ +export type BucketState = { + /** The bucket hash from X-RateLimit-Bucket header, or pseudo-hash for unknown buckets */ + bucketHash: string; + /** Normalized route pattern (e.g., "channels/:id/pins") */ + route: string; + /** HTTP method */ + method: string; + /** Number of requests remaining before rate limit */ + remaining: number; + /** Total requests allowed per window */ + limit: number; + /** Timestamp when the bucket resets (ms since epoch) */ + resetAt: number; + /** Whether this bucket is currently blocked */ + blocked: boolean; + /** Rate limit scope: 'user', 'global', or 'shared' */ + scope: 'user' | 'global' | 'shared'; +}; + +/** + * Queue configuration + */ +export const CONFIG = { + /** Milliseconds between gateway queue processing */ + GATEWAY_INTERVAL: 100, + /** Milliseconds between REST queue processing */ + REST_INTERVAL: 100, + /** Maximum concurrent REST requests*/ + REST_MAX_CONCURRENT: 5, + /** Default retry after time in ms if not provided in 429 response */ + DEFAULT_RETRY_AFTER: 5000, + /** Redis queue size threshold for backpressure - don't start new tasks if above this */ + REDIS_QUEUE_THRESHOLD: 10000, + /** Task execution timeout in ms - prevents hanging tasks from blocking the queue */ + TASK_TIMEOUT: 30000, +} as const; diff --git a/src/Util/firstChannelInteraction.ts b/src/Util/firstChannelInteraction.ts deleted file mode 100644 index e60524b..0000000 --- a/src/Util/firstChannelInteraction.ts +++ /dev/null @@ -1,24 +0,0 @@ -import cache from '../BaseClient/Bot/Cache.js'; - -import requestChannelPins from './requestChannelPins.js'; - -export default async (channelId: string, guildId: string) => { - if (!channelId) return false; - - const [isMember] = await cache.execPipeline<[string | null]>((pipeline) => { - pipeline.hget('channel-interacts', channelId); - pipeline.hset('channel-interacts', channelId, '1'); - pipeline.call('hexpire', 'channel-interacts', 604800, 'NX', 'FIELDS', 1, channelId); - }); - if (isMember === '1') return false; - - await Promise.allSettled(Object.values(tasks).map((t) => t(channelId, guildId))); - return true; -}; - -export const tasks = { - pins: async (channelId: string, guildId: string) => { - if (!guildId) return; - await requestChannelPins(channelId, guildId); - }, -}; diff --git a/src/Util/firstGuildInteraction.ts b/src/Util/firstGuildInteraction.ts index f59ad20..2d8ea13 100644 --- a/src/Util/firstGuildInteraction.ts +++ b/src/Util/firstGuildInteraction.ts @@ -1,178 +1,30 @@ -import { getGuildPerms } from '@ayako/utility'; -import { GuildFeature, PermissionFlagsBits } from 'discord-api-types/v10'; +import { getRandom } from '@ayako/utility'; import cache from '../BaseClient/Bot/Cache.js'; -import { api, cache as clientCache } from '../BaseClient/Bot/Client.js'; +import { cache as clientCache } from '../BaseClient/Bot/Client.js'; -import requestEventSubscribers from './requestEventSubscribers.js'; -import requestGuildMembers from './requestGuildMembers.js'; -import requestVoiceChannelStatuses from './requestVoiceChannelStatuses.js'; +import { priorityQueue } from './PriorityQueue/index.js'; export default async (guildId: string) => { const [isMember] = await cache.execPipeline<[string | null]>((pipeline) => { pipeline.hget('guild-interacts', guildId); pipeline.hset('guild-interacts', guildId, '1'); - pipeline.call('hexpire', 'guild-interacts', 604800, 'NX', 'FIELDS', 1, guildId); + pipeline.call( + 'hexpire', + 'guild-interacts', + getRandom(604800 / 2, 604800), + 'NX', + 'FIELDS', + 1, + guildId, + ); }); if (isMember === '1') return false; - await Promise.allSettled(Object.values(tasks).map((t) => t(guildId))); - return true; -}; - -export const tasks = { - vcStatus: (guildId: string) => requestVoiceChannelStatuses(guildId), - autoModRules: async (guildId: string) => { - const perms = await getGuildPerms.call(cache, guildId, clientCache.user?.id || '0'); - if ((perms.response & PermissionFlagsBits.ManageGuild) !== PermissionFlagsBits.ManageGuild) { - return; - } - - const keystoreKey = cache.automods.keystore(guildId); - const keys = await cache.cacheDb.hkeys(keystoreKey); - if (keys.length > 0) await cache.cacheDb.del(...keys, keystoreKey); - const rules = await api.guilds.getAutoModerationRules(guildId).catch(() => []); - rules.forEach((r) => cache.automods.set(r)); - }, - commands: async (guildId: string) => { - if (!clientCache.user) return; - - const keystoreKey = cache.commands.keystore(guildId); - const keys = await cache.cacheDb.hkeys(keystoreKey); - if (keys.length > 0) await cache.cacheDb.del(...keys, keystoreKey); - - const commands = await api.applicationCommands - .getGuildCommands(clientCache.user.id, guildId) - .catch(() => []); - commands.forEach((c) => cache.guildCommands.set({ ...c, guild_id: guildId })); - }, - members: async (guildId: string) => requestGuildMembers(guildId), - commandPermissions: async (guildId: string) => { - if (!clientCache.user) return; - - const keystoreKey = cache.commandPermissions.keystore(guildId); - const keys = await cache.cacheDb.hkeys(keystoreKey); - if (keys.length > 0) await cache.cacheDb.del(...keys, keystoreKey); - - const commandPerms = await api.applicationCommands - .getGuildCommandsPermissions(clientCache.user.id, guildId) - .catch(() => []); - - commandPerms.forEach((command) => - command.permissions.forEach((perm) => cache.commandPermissions.set(perm, guildId, command.id)), - ); - }, - - welcomeScreen: async (guildId: string) => { - const guild = await cache.guilds.get(guildId); - if (!guild) return; - - if (!guild.features.includes(GuildFeature.WelcomeScreenEnabled)) { - const perms = await getGuildPerms.call(cache, guildId, clientCache.user?.id || '0'); - if ((perms.response & PermissionFlagsBits.ManageGuild) !== PermissionFlagsBits.ManageGuild) { - return; - } - } - - const keystoreKey = cache.welcomeScreens.keystore(guildId); - const keys = await cache.cacheDb.hkeys(keystoreKey); - if (keys.length > 0) await cache.cacheDb.del(...keys, keystoreKey); - - const welcomeScreen = await api.guilds.getWelcomeScreen(guildId).catch(() => null); - if (!welcomeScreen) return; + const memberCount = clientCache.members.get(guildId) || 0; - cache.welcomeScreens.set(welcomeScreen, guildId); - }, - onboarding: async (guildId: string) => { - const guild = await cache.guilds.get(guildId); - if (!guild) return; + priorityQueue.enqueueGuildTasks(guildId, memberCount); + await priorityQueue.enqueueMembers(guildId, memberCount); - const perms = await getGuildPerms.call(cache, guildId, clientCache.user?.id || '0'); - if ((perms.response & PermissionFlagsBits.ManageGuild) !== PermissionFlagsBits.ManageGuild) { - return; - } - - const onboarding = await api.guilds.getOnboarding(guildId); - cache.onboardings.set(onboarding); - }, - scheduledEvents: async (guildId: string) => { - const keystoreKey = cache.events.keystore(guildId); - const keys = await cache.cacheDb.hkeys(keystoreKey); - if (keys.length > 0) await cache.cacheDb.del(...keys, keystoreKey); - - const scheduledEvents = await api.guilds - .getScheduledEvents(guildId, { with_user_count: true }) - .catch(() => []); - scheduledEvents.forEach((e) => cache.events.set(e)); - - const members = ( - await Promise.all(scheduledEvents.map((e) => requestEventSubscribers(e))) - ).flat(); - - members.forEach((u) => { - cache.users.set(u.user); - cache.eventUsers.set( - { - guild_id: guildId, - guild_scheduled_event_id: u.guildScheduledEventId, - user: u.user, - user_id: u.user.id, - member: u.member, - }, - guildId, - ); - - if (u.member) cache.members.set(u.member, guildId); - }); - }, - webhooks: async (guildId: string) => { - const perms = await getGuildPerms.call(cache, guildId, clientCache.user?.id || '0'); - if ( - (perms.response & PermissionFlagsBits.ManageWebhooks) !== - PermissionFlagsBits.ManageWebhooks - ) { - return; - } - - const keystoreKey = cache.webhooks.keystore(guildId); - const keys = await cache.cacheDb.hkeys(keystoreKey); - if (keys.length > 0) await cache.cacheDb.del(...keys, keystoreKey); - - const webhooks = await api.guilds.getWebhooks(guildId).catch(() => []); - webhooks.forEach((w) => cache.webhooks.set(w)); - }, - integrations: async (guildId: string) => { - const perms = await getGuildPerms.call(cache, guildId, clientCache.user?.id || '0'); - if ((perms.response & PermissionFlagsBits.ManageGuild) !== PermissionFlagsBits.ManageGuild) { - return; - } - - const keystoreKey = cache.integrations.keystore(guildId); - const keys = await cache.cacheDb.hkeys(keystoreKey); - if (keys.length > 0) await cache.cacheDb.del(...keys, keystoreKey); - - const integrations = await api.guilds.getIntegrations(guildId).catch(() => []); - integrations.forEach((i) => cache.integrations.set(i, guildId)); - }, - invites: async (guildId: string) => { - const perms = await getGuildPerms.call(cache, guildId, clientCache.user?.id || '0'); - if ((perms.response & PermissionFlagsBits.ManageGuild) !== PermissionFlagsBits.ManageGuild) { - if ((perms.response & PermissionFlagsBits.ViewAuditLog) !== PermissionFlagsBits.ViewAuditLog) { - return; - } - } - - const keystoreKey = cache.invites.keystore(guildId); - const keys = await cache.cacheDb.hkeys(keystoreKey); - const guildCodestoreKey = cache.invites.codestore(guildId); - const globalCodestoreKey = cache.invites.codestore(); - - const codes = await cache.cacheDb.hkeys(guildCodestoreKey); - - if (keys.length > 0) await cache.cacheDb.del(...keys, keystoreKey, guildCodestoreKey); - if (codes.length > 0) await cache.cacheDb.hdel(globalCodestoreKey, ...codes); - - const invites = await api.guilds.getInvites(guildId).catch(() => []); - invites.forEach((i) => cache.invites.set(i)); - }, + return true; }; diff --git a/src/Util/requestChannelPins.ts b/src/Util/requestChannelPins.ts deleted file mode 100644 index c576220..0000000 --- a/src/Util/requestChannelPins.ts +++ /dev/null @@ -1,110 +0,0 @@ -import { getChannelPerms } from '@ayako/utility'; -import { PermissionFlagsBits } from 'discord-api-types/v10'; - -import redis from '../BaseClient/Bot/Cache.js'; -import { api, cache } from '../BaseClient/Bot/Client.js'; - -const requestChannelPins = async (channelId: string, guildId: string): Promise => { - if (!channelId || !guildId) return; - - if (cache.requestingPins === channelId) return; - if (cache.requestPinsQueue.has(channelId)) return; - - const [alreadyRequested] = await redis.execPipeline<[string | null]>((pipeline) => { - pipeline.hget('channel-pins-requested', channelId); - pipeline.hset('channel-pins-requested', channelId, '1'); - pipeline.call('hexpire', 'channel-pins-requested', 300, 'NX', 'FIELDS', 1, channelId); - }); - if (alreadyRequested === '1') return; - - cache.requestPinsQueue.add(channelId); - cache.requestPinsGuildMap.set(channelId, guildId); -}; - -const processPinsRequest = async (channelId: string): Promise => { - const guildId = cache.requestPinsGuildMap.get(channelId); - if (!guildId) return; - - const channelPerms = await getChannelPerms.call(redis, guildId, cache.user?.id || '0', channelId); - const readPerms = PermissionFlagsBits.ViewAuditLog | PermissionFlagsBits.ReadMessageHistory; - if ((channelPerms.allow & readPerms) !== readPerms) return; - - await redis.pins.delAll(channelId); - - try { - const pins = await api.channels.getPins(channelId); - - for (let i = 0; i < pins.length; i++) { - const pin = pins[i]; - redis.pins.set(channelId, pin.id); - redis.messages.set(pin, guildId); - (pins as unknown[])[i] = undefined; - } - pins.length = 0; - } catch (error: unknown) { - if (isRateLimitError(error)) { - const retryAfter = getRateLimitRetryAfter(error); - - cache.requestPinsQueue.add(channelId); - cache.requestPinsGuildMap.set(channelId, guildId); - - cache.requestPinsPaused = true; - setTimeout(() => { - cache.requestPinsPaused = false; - }, retryAfter); - } - } -}; - -const isRateLimitError = (error: unknown): boolean => { - if (error && typeof error === 'object' && 'status' in error) { - return (error as { status: number }).status === 429; - } - return false; -}; - -const getRateLimitRetryAfter = (error: unknown): number => { - const DEFAULT_RETRY = 5000; - - if (error && typeof error === 'object') { - if ('rawError' in error) { - // eslint-disable-next-line @typescript-eslint/naming-convention - const { rawError } = error as { rawError: { retry_after?: number } }; - if (rawError?.retry_after) { - return Math.ceil(rawError.retry_after * 1000); - } - } - } - - return DEFAULT_RETRY; -}; - -let isProcessingPinsQueue = false; - -const processPinsQueue = async (): Promise => { - if (isProcessingPinsQueue) return; - if (cache.requestingPins) return; - if (cache.requestPinsPaused) return; - if (cache.requestPinsQueue.size === 0) return; - - const [nextChannelId] = cache.requestPinsQueue.values(); - if (!nextChannelId) return; - - isProcessingPinsQueue = true; - cache.requestPinsQueue.delete(nextChannelId); - cache.requestingPins = nextChannelId; - - try { - await processPinsRequest(nextChannelId); - } finally { - isProcessingPinsQueue = false; - cache.requestingPins = null; - cache.requestPinsGuildMap.delete(nextChannelId); - } -}; - -setInterval(() => { - processPinsQueue().catch(() => {}); -}, 500); - -export default requestChannelPins; diff --git a/src/Util/requestGuildMembers.ts b/src/Util/requestGuildMembers.ts index 4e148f7..3f3ae86 100644 --- a/src/Util/requestGuildMembers.ts +++ b/src/Util/requestGuildMembers.ts @@ -1,59 +1,10 @@ -import { GatewayOpcodes } from 'discord-api-types/gateway/v10'; +import { cache } from '../BaseClient/Bot/Client.js'; -import RedisCache from '../BaseClient/Bot/Cache.js'; -import { cache, gateway } from '../BaseClient/Bot/Client.js'; +import { priorityQueue } from './PriorityQueue/index.js'; -import calculateShardId from './calculateShardId.js'; - -const requestGuildMembers = async (guildId: string) => { - if (cache.requestingGuild !== guildId && cache.requestingGuild) { - cache.requestGuildQueue.add(guildId); - return; - } - - cache.requestingGuild = guildId; - - const [isMember] = await RedisCache.execPipeline<[string | null]>((pipeline) => { - pipeline.hget('guild-members-requested', guildId); - pipeline.hset('guild-members-requested', guildId, '1'); - pipeline.call('hexpire', 'guild-members-requested', 604800, 'NX', 'FIELDS', 1, guildId); - }); - - if (isMember === '1') { - cache.requestingGuild = null; - return; - } - - gateway.send(calculateShardId(guildId), { - op: GatewayOpcodes.RequestGuildMembers, - d: { guild_id: guildId, presences: false, limit: 0, query: '' }, - }); -}; - -let isProcessingGuildQueue = false; - -const processGuildQueue = async (): Promise => { - if (isProcessingGuildQueue) return; - if (cache.requestingGuild) return; - if (cache.requestGuildQueue.size === 0) return; - - isProcessingGuildQueue = true; - try { - const [nextGuild] = [...cache.requestGuildQueue.values()] - .map((id) => ({ id, members: cache.members.get(id) || 0 })) - .sort((a, b) => b.members - a.members); - - if (!nextGuild) return; - - cache.requestGuildQueue.delete(nextGuild.id); - await requestGuildMembers(nextGuild.id); - } finally { - isProcessingGuildQueue = false; - } +const requestGuildMembers = async (guildId: string): Promise => { + const memberCount = cache.members.get(guildId) || 0; + await priorityQueue.enqueueMembers(guildId, memberCount); }; -setInterval(() => { - processGuildQueue().catch(() => {}); -}, 100); - export default requestGuildMembers; diff --git a/src/bot.ts b/src/bot.ts index 7a475c6..0b1896f 100644 --- a/src/bot.ts +++ b/src/bot.ts @@ -1,16 +1,33 @@ /* eslint-disable no-console */ +import { heapStats } from 'bun:jsc'; import 'dotenv/config'; import './BaseClient/Bot/Client.js'; +import { getInfo } from 'discord-hybrid-sharding'; + +import { priorityQueue } from './Util/PriorityQueue/index.js'; if (process.argv.includes('--debug')) console.log('[Debug] Debug mode enabled'); if (process.argv.includes('--debug-db')) console.log('[Debug] Debug mode for database enabled'); if (process.argv.includes('--warn')) console.log('[Debug] Warn mode enabled'); if (process.argv.includes('--silent')) console.log('[Debug] Silent mode enabled'); +const clusterId = getInfo().CLUSTER; + +setInterval(() => { + const stats = heapStats(); + const mem = process.memoryUsage(); + const queueStats = priorityQueue.getStats(); + console.log( + `[Shard ${clusterId}] RSS: ${Math.round(mem.rss / 1024 / 1024)}MB | Heap: ${Math.round(stats.heapSize / 1024 / 1024)}MB | Objects: ${stats.objectCount} | GatewayQ: ${queueStats.gatewayQueue} | RestQ: ${queueStats.restQueue} (${queueStats.activeRestRequests} active)`, + ); +}, 30000); + (async () => { - [ - './BaseClient/Bot/Events/Gateway.js', - './BaseClient/Bot/Events/Process.js', - './BaseClient/Bot/Events/Rest.js', - ].forEach((fileName) => import(fileName)); + await Promise.all([ + import('./BaseClient/Bot/Events/Gateway.js'), + import('./BaseClient/Bot/Events/Process.js'), + import('./BaseClient/Bot/Events/Rest.js'), + ]); + + priorityQueue.start(); })(); diff --git a/src/index.ts b/src/index.ts index 41e0e06..a4e67da 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,4 +1,5 @@ /* eslint-disable no-console */ +import { heapStats } from 'bun:jsc'; import { scheduleJob } from 'node-schedule'; console.clear(); @@ -8,13 +9,19 @@ console.log('+ --debug --warn --dev --local +'); console.log('+++++++++++++++++++++++++++++++++++++++++++++++++'); (async () => { - [ - './BaseClient/Cluster/Manager.js', - './BaseClient/Cluster/Events.js', - './BaseClient/Cluster/Stats.js', - ].forEach((fileName) => import(fileName)); + await import('./BaseClient/Cluster/Manager.js'); + await import('./BaseClient/Cluster/Events.js'); + await import('./BaseClient/Cluster/Stats.js'); })(); +setInterval(() => { + const stats = heapStats(); + const mem = process.memoryUsage(); + console.log( + `[Memory] RSS: ${Math.round(mem.rss / 1024 / 1024)}MB | Heap: ${Math.round(stats.heapSize / 1024 / 1024)}MB | Objects: ${stats.objectCount}`, + ); +}, 30000); + scheduleJob('*/10 * * * *', async () => { console.log(`=> Current Date: ${new Date().toLocaleString()}`); });