From 9d3451b8a30d9c09d4f75ecd5c921eb215de8b04 Mon Sep 17 00:00:00 2001 From: Lars_und_so <46791248+Larsundso@users.noreply.github.com> Date: Wed, 14 Jan 2026 20:56:57 +0100 Subject: [PATCH 01/21] refactor: update Dockerfile and package.json for bun compatibility; streamline Redis integration --- Dockerfile | 26 +++++++++++++--------- package.json | 14 ++++++------ src/BaseClient/Bot/CacheHandlers/Guilds.ts | 22 +++++++++--------- src/BaseClient/Cluster/Manager.ts | 7 +----- src/BaseClient/Cluster/Redis.ts | 4 ++-- 5 files changed, 36 insertions(+), 37 deletions(-) diff --git a/Dockerfile b/Dockerfile index 022002a..a9fd30b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,15 +1,19 @@ -FROM node:alpine -WORKDIR /app +FROM oven/bun:1.3 -COPY ./package.json /app/package.json -COPY ./pnpm-lock.yaml /app/pnpm-lock.yaml -RUN npm install -g pnpm - -RUN pnpm install -RUN pnpm build +RUN apt-get update && apt-get install -y \ + build-essential \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* COPY . /app -WORKDIR /app -COPY ./.env /app/.env -WORKDIR /app \ No newline at end of file +WORKDIR /app/Ayako/packages/Utility +RUN rm -rf ./dist +RUN bun install +RUN bun run build + +WORKDIR /app/Ayako/packages/Gateway +COPY ./.env /app/Ayako/packages/Gateway/.env +RUN rm -rf ./dist +RUN bun install +RUN bun run build diff --git a/package.json b/package.json index d512149..8f4dd23 100644 --- a/package.json +++ b/package.json @@ -4,12 +4,12 @@ "private": true, "type": "module", "scripts": { - "build": "swc src -d dist --strip-leading-paths --copy-files", - "dev": "pnpm build && node --enable-source-maps --experimental-wasm-modules --experimental-json-modules ./dist/index.js --debug --warn --dev --debug-db --local", - "lint": "pnpx eslint 'src/**/*.ts' --fix", - "run": "node --max-old-space-size=64 --no-deprecation --no-warnings --experimental-json-modules ./dist/index.js", - "start": "pnpm run run", - "watch": "swc src -d dist --strip-leading-paths --copy-files --watch" + "build": "bun build src/index.ts --outdir dist --target bun", + "dev": "bun run build && bun ./dist/index.js --debug --warn --dev --debug-db --local", + "lint": "bunx eslint 'src/**/*.ts' --fix", + "run": "bun ./dist/index.js", + "start": "bun run run", + "watch": "bun build src/index.ts --outdir dist --target bun --watch" }, "dependencies": { "@discordjs/core": "^2.4.0", @@ -19,7 +19,6 @@ "discord-hybrid-sharding": "^3.0.1", "dotenv": "^17.2.3", "glob": "^13.0.0", - "ioredis": "^5.9.1", "node-schedule": "^2.1.1", "prom-client": "^15.1.3" }, @@ -32,6 +31,7 @@ "@swc/core": "1.15.8", "@total-typescript/ts-reset": "^0.6.1", "@types/glob": "^9.0.0", + "@types/bun": "latest", "@types/node": "^25.0.7", "@types/node-schedule": "^2.1.8", "@typescript-eslint/eslint-plugin": "^8.53.0", diff --git a/src/BaseClient/Bot/CacheHandlers/Guilds.ts b/src/BaseClient/Bot/CacheHandlers/Guilds.ts index 54d9139..29fc86f 100644 --- a/src/BaseClient/Bot/CacheHandlers/Guilds.ts +++ b/src/BaseClient/Bot/CacheHandlers/Guilds.ts @@ -66,7 +66,7 @@ export default { const rGuild = redis.guilds.apiToR(data); if (rGuild) { const guildJson = JSON.stringify(rGuild); - redis.batcher.queueSync((p) => { + redis.queueSync((p) => { p.set(`cache:guilds:${guildId}:current`, guildJson, 'EX', 604800); p.hset('keystore:guilds', `cache:guilds:${guildId}`, 0); }); @@ -84,14 +84,14 @@ export default { if (rMember) { const memberJson = JSON.stringify(rMember); - redis.batcher.queueSync((p) => { + redis.queueSync((p) => { p.set(`cache:members:${guildId}:${userId}:current`, memberJson, 'EX', 604800); p.hset(`keystore:members:${guildId}`, `cache:members:${guildId}:${userId}`, 0); }); } if (rUser) { const userJson = JSON.stringify(rUser); - redis.batcher.queueSync((p) => { + redis.queueSync((p) => { p.set(`cache:users:${userId}:current`, userJson, 'EX', 604800); }); } @@ -104,7 +104,7 @@ export default { (data.channels as unknown[])[i] = undefined; if (rChannel) { const channelJson = JSON.stringify(rChannel); - redis.batcher.queueSync((p) => { + redis.queueSync((p) => { p.set(`cache:channels:${channel.id}:current`, channelJson, 'EX', 604800); p.hset(`keystore:channels:${guildId}`, `cache:channels:${channel.id}`, 0); }); @@ -118,7 +118,7 @@ export default { (data.roles as unknown[])[i] = undefined; if (rRole) { const roleJson = JSON.stringify(rRole); - redis.batcher.queueSync((p) => { + redis.queueSync((p) => { p.set(`cache:roles:${role.id}:current`, roleJson, 'EX', 604800); p.hset(`keystore:roles:${guildId}`, `cache:roles:${role.id}`, 0); }); @@ -133,7 +133,7 @@ export default { (data.emojis as unknown[])[i] = undefined; if (rEmoji) { const emojiJson = JSON.stringify(rEmoji); - redis.batcher.queueSync((p) => { + redis.queueSync((p) => { p.set(`cache:emojis:${emoji.id}:current`, emojiJson, 'EX', 604800); p.hset(`keystore:emojis:${guildId}`, `cache:emojis:${emoji.id}`, 0); }); @@ -147,7 +147,7 @@ export default { (data.stickers as unknown[])[i] = undefined; if (rSticker) { const stickerJson = JSON.stringify(rSticker); - redis.batcher.queueSync((p) => { + redis.queueSync((p) => { p.set(`cache:stickers:${sticker.id}:current`, stickerJson, 'EX', 604800); p.hset(`keystore:stickers:${guildId}`, `cache:stickers:${sticker.id}`, 0); }); @@ -161,7 +161,7 @@ export default { (data.soundboard_sounds as unknown[])[i] = undefined; if (rSound) { const soundJson = JSON.stringify(rSound); - redis.batcher.queueSync((p) => { + redis.queueSync((p) => { p.set(`cache:soundboards:${sound.sound_id}:current`, soundJson, 'EX', 604800); }); } @@ -176,7 +176,7 @@ export default { (data.voice_states as unknown[])[i] = undefined; if (rVoice) { const voiceJson = JSON.stringify(rVoice); - redis.batcher.queueSync((p) => { + redis.queueSync((p) => { p.set(`cache:voices:${guildId}:${voice.user_id}:current`, voiceJson, 'EX', 604800); p.hset(`keystore:voices:${guildId}`, `cache:voices:${guildId}:${voice.user_id}`, 0); }); @@ -191,7 +191,7 @@ export default { (data.threads as unknown[])[i] = undefined; if (rThread) { const threadJson = JSON.stringify(rThread); - redis.batcher.queueSync((p) => { + redis.queueSync((p) => { p.set(`cache:threads:${thread.id}:current`, threadJson, 'EX', 604800); p.hset(`keystore:threads:${guildId}`, `cache:threads:${thread.id}`, 0); }); @@ -205,7 +205,7 @@ export default { (data.guild_scheduled_events as unknown[])[i] = undefined; if (rEvent) { const eventJson = JSON.stringify(rEvent); - redis.batcher.queueSync((p) => { + redis.queueSync((p) => { p.set(`cache:events:${event.id}:current`, eventJson, 'EX', 604800); p.hset(`keystore:events:${event.guild_id}`, `cache:events:${event.id}`, 0); }); diff --git a/src/BaseClient/Cluster/Manager.ts b/src/BaseClient/Cluster/Manager.ts index 19e5582..b2d25f4 100644 --- a/src/BaseClient/Cluster/Manager.ts +++ b/src/BaseClient/Cluster/Manager.ts @@ -9,12 +9,7 @@ const manager = new ClusterManager('./dist/bot.js', { shardsPerClusters: 10, token: (process.argv.includes('--dev') ? process.env.DevToken : process.env.Token) ?? '', shardArgs: process.argv, - execArgv: [ - '--max-old-space-size=512', - '--experimental-json-modules', - '--inspect=0.0.0.0:9229', - ...(process.argv.includes('--dev') ? [] : ['--no-deprecation', '--no-warnings']), - ], + execArgv: [], respawn: true, mode: 'process', }); diff --git a/src/BaseClient/Cluster/Redis.ts b/src/BaseClient/Cluster/Redis.ts index 420cb1e..ccd9a9b 100644 --- a/src/BaseClient/Cluster/Redis.ts +++ b/src/BaseClient/Cluster/Redis.ts @@ -1,11 +1,11 @@ -import Redis from 'ioredis'; +import { BunRedisWrapper } from '@ayako/utility'; const cacheDBnum = process.argv.includes('--dev') ? process.env.devCacheDB : process.env.cacheDB; if (!cacheDBnum && typeof cacheDBnum !== 'number') { throw new Error('No cache DB number provided in env vars'); } -export const cacheDB = new Redis({ +export const cacheDB = new BunRedisWrapper({ host: process.argv.includes('--local') ? 'localhost' : 'redis', db: Number(cacheDBnum), }); From a275a1c90f4181ca9ea2fa22abbea92d8e8fd341 Mon Sep 17 00:00:00 2001 From: Lars_und_so <46791248+Larsundso@users.noreply.github.com> Date: Wed, 14 Jan 2026 23:33:45 +0100 Subject: [PATCH 02/21] refactor: replace bun commands with pnpm in Dockerfile and package.json for improved compatibility --- Dockerfile | 10 ++++++---- package.json | 10 +++++----- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/Dockerfile b/Dockerfile index a9fd30b..53d0a3c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,15 +5,17 @@ RUN apt-get update && apt-get install -y \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* +RUN npm install -g pnpm + COPY . /app WORKDIR /app/Ayako/packages/Utility RUN rm -rf ./dist -RUN bun install -RUN bun run build +RUN pnpm install +RUN pnpm run build WORKDIR /app/Ayako/packages/Gateway COPY ./.env /app/Ayako/packages/Gateway/.env RUN rm -rf ./dist -RUN bun install -RUN bun run build +RUN pnpm install +RUN pnpm run build diff --git a/package.json b/package.json index 8f4dd23..751bb82 100644 --- a/package.json +++ b/package.json @@ -4,12 +4,12 @@ "private": true, "type": "module", "scripts": { - "build": "bun build src/index.ts --outdir dist --target bun", - "dev": "bun run build && bun ./dist/index.js --debug --warn --dev --debug-db --local", - "lint": "bunx eslint 'src/**/*.ts' --fix", + "build": "swc src -d dist --strip-leading-paths --copy-files", + "dev": "pnpm build && bun ./dist/index.js --debug --warn --dev --debug-db --local", + "lint": "pnpx eslint 'src/**/*.ts' --fix", "run": "bun ./dist/index.js", - "start": "bun run run", - "watch": "bun build src/index.ts --outdir dist --target bun --watch" + "start": "pnpm run run", + "watch": "swc src -d dist --strip-leading-paths --copy-files --watch" }, "dependencies": { "@discordjs/core": "^2.4.0", From f0e46c70bb832b1614a8ddc215033feedb8eb9aa Mon Sep 17 00:00:00 2001 From: Lars_und_so <46791248+Larsundso@users.noreply.github.com> Date: Thu, 15 Jan 2026 00:01:34 +0100 Subject: [PATCH 03/21] refactor: simplify module imports in index.ts for improved readability --- src/index.ts | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/index.ts b/src/index.ts index 41e0e06..17ccde1 100644 --- a/src/index.ts +++ b/src/index.ts @@ -8,11 +8,9 @@ console.log('+ --debug --warn --dev --local +'); console.log('+++++++++++++++++++++++++++++++++++++++++++++++++'); (async () => { - [ - './BaseClient/Cluster/Manager.js', - './BaseClient/Cluster/Events.js', - './BaseClient/Cluster/Stats.js', - ].forEach((fileName) => import(fileName)); + await import('./BaseClient/Cluster/Manager.js'); + await import('./BaseClient/Cluster/Events.js'); + await import('./BaseClient/Cluster/Stats.js'); })(); scheduleJob('*/10 * * * *', async () => { From 6a3ce42f0f663297b9eaaa6d3a7a6b0a2f50da4d Mon Sep 17 00:00:00 2001 From: Lars_und_so <46791248+Larsundso@users.noreply.github.com> Date: Thu, 15 Jan 2026 01:33:30 +0100 Subject: [PATCH 04/21] refactor: streamline ClusterManager startup process by removing unnecessary interval logic --- src/BaseClient/Cluster/Manager.ts | 25 +++++++++---------------- 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/src/BaseClient/Cluster/Manager.ts b/src/BaseClient/Cluster/Manager.ts index b2d25f4..15baf5d 100644 --- a/src/BaseClient/Cluster/Manager.ts +++ b/src/BaseClient/Cluster/Manager.ts @@ -14,21 +14,14 @@ const manager = new ClusterManager('./dist/bot.js', { mode: 'process', }); -await manager - .spawn() - .then(() => { - setInterval(async () => { - await manager.broadcastEval('this.status && this.isReady() ? this.reconnect() : 0'); - }, 60000); - }) - .catch((e: Response) => { - console.log( - `[Cluster Manager] Startup Failed. Retry after: ${ - Number(e.headers?.get('retry-after') ?? 0) / 60 - } Minutes`, - ); - console.error(e); - process.exit(1); - }); +await manager.spawn().catch((e: Response) => { + console.log( + `[Cluster Manager] Startup Failed. Retry after: ${ + Number(e.headers?.get('retry-after') ?? 0) / 60 + } Minutes`, + ); + console.error(e); + process.exit(1); +}); export default manager; From a7b2a5ee53111dacabeefac12c892f986cf7ccd5 Mon Sep 17 00:00:00 2001 From: Lars_und_so <46791248+Larsundso@users.noreply.github.com> Date: Thu, 15 Jan 2026 01:59:39 +0100 Subject: [PATCH 05/21] refactor: enhance memory usage logging in index.ts and update run script for debugging --- package.json | 2 +- src/index.ts | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/package.json b/package.json index 751bb82..2593c36 100644 --- a/package.json +++ b/package.json @@ -7,7 +7,7 @@ "build": "swc src -d dist --strip-leading-paths --copy-files", "dev": "pnpm build && bun ./dist/index.js --debug --warn --dev --debug-db --local", "lint": "pnpx eslint 'src/**/*.ts' --fix", - "run": "bun ./dist/index.js", + "run": "bun ./dist/index.js --debug", "start": "pnpm run run", "watch": "swc src -d dist --strip-leading-paths --copy-files --watch" }, diff --git a/src/index.ts b/src/index.ts index 17ccde1..a4e67da 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,4 +1,5 @@ /* eslint-disable no-console */ +import { heapStats } from 'bun:jsc'; import { scheduleJob } from 'node-schedule'; console.clear(); @@ -13,6 +14,14 @@ console.log('+++++++++++++++++++++++++++++++++++++++++++++++++'); await import('./BaseClient/Cluster/Stats.js'); })(); +setInterval(() => { + const stats = heapStats(); + const mem = process.memoryUsage(); + console.log( + `[Memory] RSS: ${Math.round(mem.rss / 1024 / 1024)}MB | Heap: ${Math.round(stats.heapSize / 1024 / 1024)}MB | Objects: ${stats.objectCount}`, + ); +}, 30000); + scheduleJob('*/10 * * * *', async () => { console.log(`=> Current Date: ${new Date().toLocaleString()}`); }); From efb7c1d0247141380f73db7064a29b2baa55412b Mon Sep 17 00:00:00 2001 From: Lars_und_so <46791248+Larsundso@users.noreply.github.com> Date: Thu, 15 Jan 2026 02:37:05 +0100 Subject: [PATCH 06/21] refactor: enhance memory usage logging in bot.ts and improve module import handling --- src/bot.ts | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/src/bot.ts b/src/bot.ts index 7a475c6..96acaca 100644 --- a/src/bot.ts +++ b/src/bot.ts @@ -1,16 +1,28 @@ /* eslint-disable no-console */ +import { heapStats } from 'bun:jsc'; import 'dotenv/config'; import './BaseClient/Bot/Client.js'; +import { getInfo } from 'discord-hybrid-sharding'; if (process.argv.includes('--debug')) console.log('[Debug] Debug mode enabled'); if (process.argv.includes('--debug-db')) console.log('[Debug] Debug mode for database enabled'); if (process.argv.includes('--warn')) console.log('[Debug] Warn mode enabled'); if (process.argv.includes('--silent')) console.log('[Debug] Silent mode enabled'); +const clusterId = getInfo().CLUSTER; + +setInterval(() => { + const stats = heapStats(); + const mem = process.memoryUsage(); + console.log( + `[Shard ${clusterId}] RSS: ${Math.round(mem.rss / 1024 / 1024)}MB | Heap: ${Math.round(stats.heapSize / 1024 / 1024)}MB | Objects: ${stats.objectCount}`, + ); +}, 30000); + (async () => { - [ - './BaseClient/Bot/Events/Gateway.js', - './BaseClient/Bot/Events/Process.js', - './BaseClient/Bot/Events/Rest.js', - ].forEach((fileName) => import(fileName)); + await Promise.all([ + import('./BaseClient/Bot/Events/Gateway.js'), + import('./BaseClient/Bot/Events/Process.js'), + import('./BaseClient/Bot/Events/Rest.js'), + ]); })(); From 96624ceb06c61334b2fe14601258b429c3c41f2b Mon Sep 17 00:00:00 2001 From: Lars_und_so <46791248+Larsundso@users.noreply.github.com> Date: Thu, 15 Jan 2026 13:45:43 +0100 Subject: [PATCH 07/21] refactor: correct cache method from cacheDb to cachePub in EventBus.ts --- src/Util/EventBus.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Util/EventBus.ts b/src/Util/EventBus.ts index 7e5ec28..644f81a 100644 --- a/src/Util/EventBus.ts +++ b/src/Util/EventBus.ts @@ -6,7 +6,7 @@ const emit = (type: GatewayDispatchEvents, data: GatewayDispatchPayload['d']) => // eslint-disable-next-line no-console if (process.argv.includes('--debug')) console.log(`[EventBus] Emitting event: ${type}`); - cache.cacheDb.publish(type, JSON.stringify(data)); + cache.cachePub.publish(type, JSON.stringify(data)); }; export default emit; From 676a024e8301068c0d2919167de8fbcd13aa48b1 Mon Sep 17 00:00:00 2001 From: Lars_und_so <46791248+Larsundso@users.noreply.github.com> Date: Fri, 16 Jan 2026 01:23:52 +0100 Subject: [PATCH 08/21] refactor: replace HGETALL with HSCAN for non-blocking access to large keystores in cache handlers --- src/BaseClient/Bot/CacheHandlers/Channel.ts | 14 ++- src/BaseClient/Bot/CacheHandlers/Guilds.ts | 118 ++++++++++---------- src/BaseClient/Bot/CacheHandlers/Message.ts | 27 +++-- src/BaseClient/Bot/CacheHandlers/Thread.ts | 30 +++-- 4 files changed, 96 insertions(+), 93 deletions(-) diff --git a/src/BaseClient/Bot/CacheHandlers/Channel.ts b/src/BaseClient/Bot/CacheHandlers/Channel.ts index 0a6d9ed..54437cc 100644 --- a/src/BaseClient/Bot/CacheHandlers/Channel.ts +++ b/src/BaseClient/Bot/CacheHandlers/Channel.ts @@ -20,14 +20,16 @@ export default { redis.pins.delAll(data.id); redis.channelStatus.del(data.guild_id, data.id); - const pipeline = redis.cacheDb.pipeline(); - const messages = await redis.cacheDb.hgetall(redis.messages.keystore(data.guild_id)); - - pipeline.hdel( + const messageKeys = await redis.cacheDb.hscanKeys( redis.messages.keystore(data.guild_id), - ...Object.keys(messages).filter((m) => m.includes(data.id)), + `*${data.id}*`, ); - pipeline.del(...Object.keys(messages).filter((m) => m.includes(data.id))); + + if (messageKeys.length === 0) return; + + const pipeline = redis.cacheDb.pipeline(); + pipeline.hdel(redis.messages.keystore(data.guild_id), ...messageKeys); + pipeline.del(...messageKeys); await pipeline.exec(); }, diff --git a/src/BaseClient/Bot/CacheHandlers/Guilds.ts b/src/BaseClient/Bot/CacheHandlers/Guilds.ts index 29fc86f..c7a0486 100644 --- a/src/BaseClient/Bot/CacheHandlers/Guilds.ts +++ b/src/BaseClient/Bot/CacheHandlers/Guilds.ts @@ -227,36 +227,6 @@ export default { redis.channelStatus.delAll(data.id); redis.pins.delAll(data.id); - const getPipeline = redis.cacheDb.pipeline(); - - getPipeline.hgetall(redis.audits.keystore(data.id)); - getPipeline.hgetall(redis.automods.keystore(data.id)); - getPipeline.hgetall(redis.bans.keystore(data.id)); - getPipeline.hgetall(redis.channels.keystore(data.id)); - getPipeline.hgetall(redis.commandPermissions.keystore(data.id)); - getPipeline.hgetall(redis.emojis.keystore(data.id)); - getPipeline.hgetall(redis.events.keystore(data.id)); - getPipeline.hgetall(redis.guildCommands.keystore(data.id)); - getPipeline.hgetall(redis.integrations.keystore(data.id)); - getPipeline.hgetall(redis.invites.keystore(data.id)); - getPipeline.hgetall(redis.members.keystore(data.id)); - getPipeline.hgetall(redis.messages.keystore(data.id)); - getPipeline.hgetall(redis.reactions.keystore(data.id)); - getPipeline.hgetall(redis.roles.keystore(data.id)); - getPipeline.hgetall(redis.soundboards.keystore(data.id)); - getPipeline.hgetall(redis.stages.keystore(data.id)); - getPipeline.hgetall(redis.stickers.keystore(data.id)); - getPipeline.hgetall(redis.threads.keystore(data.id)); - getPipeline.hgetall(redis.threadMembers.keystore(data.id)); - getPipeline.hgetall(redis.voices.keystore(data.id)); - getPipeline.hgetall(redis.webhooks.keystore(data.id)); - getPipeline.hgetall(redis.welcomeScreens.keystore(data.id)); - getPipeline.hgetall(redis.onboardings.keystore(data.id)); - getPipeline.hgetall(redis.eventUsers.keystore(data.id)); - - const results = await getPipeline.exec(); - if (!results) return; - const [ auditlogs, automods, @@ -282,9 +252,35 @@ export default { welcomeScreens, onboarding, eventUsers, - ] = results.map((result) => result[1] || {}); + ] = await Promise.all([ + redis.cacheDb.hscanKeys(redis.audits.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.automods.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.bans.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.channels.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.commandPermissions.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.emojis.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.events.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.guildCommands.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.integrations.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.invites.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.members.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.messages.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.reactions.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.roles.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.soundboards.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.stages.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.stickers.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.threads.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.threadMembers.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.voices.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.webhooks.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.welcomeScreens.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.onboardings.keystore(data.id)), + redis.cacheDb.hscanKeys(redis.eventUsers.keystore(data.id)), + ]); const deletePipeline = redis.cacheDb.pipeline(); + deletePipeline.del(redis.guilds.keystore(data.id)); deletePipeline.del(redis.audits.keystore(data.id)); deletePipeline.del(redis.automods.keystore(data.id)); @@ -311,30 +307,30 @@ export default { deletePipeline.del(redis.onboardings.keystore(data.id)); deletePipeline.del(redis.eventUsers.keystore(data.id)); - deletePipeline.del(...Object.keys(auditlogs)); - deletePipeline.del(...Object.keys(automods)); - deletePipeline.del(...Object.keys(bans)); - deletePipeline.del(...Object.keys(channels)); - deletePipeline.del(...Object.keys(commandPermissions)); - deletePipeline.del(...Object.keys(emojis)); - deletePipeline.del(...Object.keys(events)); - deletePipeline.del(...Object.keys(guildCommands)); - deletePipeline.del(...Object.keys(integrations)); - deletePipeline.del(...Object.keys(invites)); - deletePipeline.del(...Object.keys(members)); - deletePipeline.del(...Object.keys(messages)); - deletePipeline.del(...Object.keys(reactions)); - deletePipeline.del(...Object.keys(roles)); - deletePipeline.del(...Object.keys(soundboards)); - deletePipeline.del(...Object.keys(stages)); - deletePipeline.del(...Object.keys(stickers)); - deletePipeline.del(...Object.keys(threads)); - deletePipeline.del(...Object.keys(threadMembers)); - deletePipeline.del(...Object.keys(voices)); - deletePipeline.del(...Object.keys(webhooks)); - deletePipeline.del(...Object.keys(welcomeScreens)); - deletePipeline.del(...Object.keys(onboarding)); - deletePipeline.del(...Object.keys(eventUsers)); + deletePipeline.del(...auditlogs); + deletePipeline.del(...automods); + deletePipeline.del(...bans); + deletePipeline.del(...channels); + deletePipeline.del(...commandPermissions); + deletePipeline.del(...emojis); + deletePipeline.del(...events); + deletePipeline.del(...guildCommands); + deletePipeline.del(...integrations); + deletePipeline.del(...invites); + deletePipeline.del(...members); + deletePipeline.del(...messages); + deletePipeline.del(...reactions); + deletePipeline.del(...roles); + deletePipeline.del(...soundboards); + deletePipeline.del(...stages); + deletePipeline.del(...stickers); + deletePipeline.del(...threads); + deletePipeline.del(...threadMembers); + deletePipeline.del(...voices); + deletePipeline.del(...webhooks); + deletePipeline.del(...welcomeScreens); + deletePipeline.del(...onboarding); + deletePipeline.del(...eventUsers); await deletePipeline.exec(); }, @@ -348,9 +344,9 @@ export default { firstGuildInteraction(data.guild_id); cache.emojis.set(data.guild_id, data.emojis.length); - const emojis = await redis.cacheDb.hgetall(redis.emojis.keystore(data.guild_id)); + const emojiKeys = await redis.cacheDb.hscanKeys(redis.emojis.keystore(data.guild_id)); const pipeline = redis.cacheDb.pipeline(); - pipeline.del(...Object.keys(emojis)); + pipeline.del(...emojiKeys); pipeline.del(redis.emojis.keystore(data.guild_id)); await pipeline.exec(); @@ -536,9 +532,9 @@ export default { firstGuildInteraction(data.guild_id); cache.sounds.set(data.guild_id, data.soundboard_sounds.length); - const sounds = await redis.cacheDb.hgetall(redis.soundboards.keystore(data.guild_id)); + const soundKeys = await redis.cacheDb.hscanKeys(redis.soundboards.keystore(data.guild_id)); const pipeline = redis.cacheDb.pipeline(); - pipeline.del(...Object.keys(sounds)); + pipeline.del(...soundKeys); pipeline.del(redis.soundboards.keystore(data.guild_id)); await pipeline.exec(); @@ -553,9 +549,9 @@ export default { firstGuildInteraction(data.guild_id); cache.stickers.set(data.guild_id, data.stickers.length); - const stickers = await redis.cacheDb.hgetall(redis.stickers.keystore(data.guild_id)); + const stickerKeys = await redis.cacheDb.hscanKeys(redis.stickers.keystore(data.guild_id)); const pipeline = redis.cacheDb.pipeline(); - pipeline.del(...Object.keys(stickers)); + pipeline.del(...stickerKeys); pipeline.del(redis.stickers.keystore(data.guild_id)); await pipeline.exec(); diff --git a/src/BaseClient/Bot/CacheHandlers/Message.ts b/src/BaseClient/Bot/CacheHandlers/Message.ts index e503164..72fbd21 100644 --- a/src/BaseClient/Bot/CacheHandlers/Message.ts +++ b/src/BaseClient/Bot/CacheHandlers/Message.ts @@ -162,13 +162,16 @@ export default { firstChannelInteraction(data.channel_id, data.guild_id); } - const pipeline = redis.cacheDb.pipeline(); - const reactions = await redis.cacheDb.hgetall(redis.reactions.keystore(data.message_id)); - pipeline.hdel( + const reactionKeys = await redis.cacheDb.hscanKeys( redis.reactions.keystore(data.message_id), - ...Object.keys(reactions).filter((r) => r.includes(data.message_id)), + `*${data.message_id}*`, ); - pipeline.del(...Object.keys(reactions).filter((r) => r.includes(data.message_id))); + + if (reactionKeys.length === 0) return; + + const pipeline = redis.cacheDb.pipeline(); + pipeline.hdel(redis.reactions.keystore(data.message_id), ...reactionKeys); + pipeline.del(...reactionKeys); await pipeline.exec(); }, @@ -179,13 +182,17 @@ export default { firstGuildInteraction(data.guild_id); firstChannelInteraction(data.channel_id, data.guild_id); - const reactions = await redis.cacheDb.hgetall(redis.reactions.keystore(data.guild_id)); - - const pipeline = redis.cacheDb.pipeline(); - const filteredReactions = Object.keys(reactions).filter( - (r) => r.includes(data.message_id) && r.includes((data.emoji.id || data.emoji.name)!), + const emojiId = data.emoji.id || data.emoji.name; + const reactionKeys = await redis.cacheDb.hscanKeys( + redis.reactions.keystore(data.guild_id), + `*${data.message_id}*`, ); + const filteredReactions = reactionKeys.filter((r) => r.includes(emojiId!)); + + if (filteredReactions.length === 0) return; + + const pipeline = redis.cacheDb.pipeline(); pipeline.hdel(redis.reactions.keystore(data.guild_id), ...filteredReactions); pipeline.del(...filteredReactions); await pipeline.exec(); diff --git a/src/BaseClient/Bot/CacheHandlers/Thread.ts b/src/BaseClient/Bot/CacheHandlers/Thread.ts index 0083bf9..b646870 100644 --- a/src/BaseClient/Bot/CacheHandlers/Thread.ts +++ b/src/BaseClient/Bot/CacheHandlers/Thread.ts @@ -26,26 +26,24 @@ export default { firstGuildInteraction(data.guild_id); - const selectPipeline = redis.cacheDb.pipeline(); - selectPipeline.hgetall(redis.threadMembers.keystore(data.guild_id)); - selectPipeline.hgetall(redis.messages.keystore(data.guild_id)); - const result = await selectPipeline.exec(); - if (!result) return; + const [threadMemberKeys, messageKeys] = await Promise.all([ + redis.cacheDb.hscanKeys(redis.threadMembers.keystore(data.guild_id), `*${data.id}*`), + redis.cacheDb.hscanKeys(redis.messages.keystore(data.guild_id), `*${data.id}*`), + ]); + + if (threadMemberKeys.length === 0 && messageKeys.length === 0) return; - const [threadMembers, messages] = result; const deletePipeline = redis.cacheDb.pipeline(); - deletePipeline.hdel( - redis.threadMembers.keystore(data.guild_id), - ...Object.keys(threadMembers).filter((m) => m.includes(data.id)), - ); - deletePipeline.del(...Object.keys(threadMembers).filter((m) => m.includes(data.id))); + if (threadMemberKeys.length > 0) { + deletePipeline.hdel(redis.threadMembers.keystore(data.guild_id), ...threadMemberKeys); + deletePipeline.del(...threadMemberKeys); + } - deletePipeline.hdel( - redis.messages.keystore(data.guild_id), - ...Object.keys(messages).filter((m) => m.includes(data.id)), - ); - deletePipeline.del(...Object.keys(messages).filter((m) => m.includes(data.id))); + if (messageKeys.length > 0) { + deletePipeline.hdel(redis.messages.keystore(data.guild_id), ...messageKeys); + deletePipeline.del(...messageKeys); + } await deletePipeline.exec(); }, From 371d4da9064156c7810c85df1bb8241333a33662 Mon Sep 17 00:00:00 2001 From: Lars_und_so <46791248+Larsundso@users.noreply.github.com> Date: Fri, 16 Jan 2026 13:14:56 +0100 Subject: [PATCH 09/21] refactor: replace BunRedisWrapper with createRedisWrapper for Redis connection --- src/BaseClient/Bot/CacheHandlers/Guilds.ts | 23 +++++++++++----------- src/BaseClient/Cluster/Redis.ts | 4 ++-- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/src/BaseClient/Bot/CacheHandlers/Guilds.ts b/src/BaseClient/Bot/CacheHandlers/Guilds.ts index c7a0486..c500a6f 100644 --- a/src/BaseClient/Bot/CacheHandlers/Guilds.ts +++ b/src/BaseClient/Bot/CacheHandlers/Guilds.ts @@ -1,3 +1,4 @@ +import type { ChainableCommanderInterface } from '@ayako/utility'; import { GatewayDispatchEvents, type GatewayGuildAuditLogEntryCreateDispatchData, @@ -66,7 +67,7 @@ export default { const rGuild = redis.guilds.apiToR(data); if (rGuild) { const guildJson = JSON.stringify(rGuild); - redis.queueSync((p) => { + redis.queueSync((p: ChainableCommanderInterface) => { p.set(`cache:guilds:${guildId}:current`, guildJson, 'EX', 604800); p.hset('keystore:guilds', `cache:guilds:${guildId}`, 0); }); @@ -84,14 +85,14 @@ export default { if (rMember) { const memberJson = JSON.stringify(rMember); - redis.queueSync((p) => { + redis.queueSync((p: ChainableCommanderInterface) => { p.set(`cache:members:${guildId}:${userId}:current`, memberJson, 'EX', 604800); p.hset(`keystore:members:${guildId}`, `cache:members:${guildId}:${userId}`, 0); }); } if (rUser) { const userJson = JSON.stringify(rUser); - redis.queueSync((p) => { + redis.queueSync((p: ChainableCommanderInterface) => { p.set(`cache:users:${userId}:current`, userJson, 'EX', 604800); }); } @@ -104,7 +105,7 @@ export default { (data.channels as unknown[])[i] = undefined; if (rChannel) { const channelJson = JSON.stringify(rChannel); - redis.queueSync((p) => { + redis.queueSync((p: ChainableCommanderInterface) => { p.set(`cache:channels:${channel.id}:current`, channelJson, 'EX', 604800); p.hset(`keystore:channels:${guildId}`, `cache:channels:${channel.id}`, 0); }); @@ -118,7 +119,7 @@ export default { (data.roles as unknown[])[i] = undefined; if (rRole) { const roleJson = JSON.stringify(rRole); - redis.queueSync((p) => { + redis.queueSync((p: ChainableCommanderInterface) => { p.set(`cache:roles:${role.id}:current`, roleJson, 'EX', 604800); p.hset(`keystore:roles:${guildId}`, `cache:roles:${role.id}`, 0); }); @@ -133,7 +134,7 @@ export default { (data.emojis as unknown[])[i] = undefined; if (rEmoji) { const emojiJson = JSON.stringify(rEmoji); - redis.queueSync((p) => { + redis.queueSync((p: ChainableCommanderInterface) => { p.set(`cache:emojis:${emoji.id}:current`, emojiJson, 'EX', 604800); p.hset(`keystore:emojis:${guildId}`, `cache:emojis:${emoji.id}`, 0); }); @@ -147,7 +148,7 @@ export default { (data.stickers as unknown[])[i] = undefined; if (rSticker) { const stickerJson = JSON.stringify(rSticker); - redis.queueSync((p) => { + redis.queueSync((p: ChainableCommanderInterface) => { p.set(`cache:stickers:${sticker.id}:current`, stickerJson, 'EX', 604800); p.hset(`keystore:stickers:${guildId}`, `cache:stickers:${sticker.id}`, 0); }); @@ -161,7 +162,7 @@ export default { (data.soundboard_sounds as unknown[])[i] = undefined; if (rSound) { const soundJson = JSON.stringify(rSound); - redis.queueSync((p) => { + redis.queueSync((p: ChainableCommanderInterface) => { p.set(`cache:soundboards:${sound.sound_id}:current`, soundJson, 'EX', 604800); }); } @@ -176,7 +177,7 @@ export default { (data.voice_states as unknown[])[i] = undefined; if (rVoice) { const voiceJson = JSON.stringify(rVoice); - redis.queueSync((p) => { + redis.queueSync((p: ChainableCommanderInterface) => { p.set(`cache:voices:${guildId}:${voice.user_id}:current`, voiceJson, 'EX', 604800); p.hset(`keystore:voices:${guildId}`, `cache:voices:${guildId}:${voice.user_id}`, 0); }); @@ -191,7 +192,7 @@ export default { (data.threads as unknown[])[i] = undefined; if (rThread) { const threadJson = JSON.stringify(rThread); - redis.queueSync((p) => { + redis.queueSync((p: ChainableCommanderInterface) => { p.set(`cache:threads:${thread.id}:current`, threadJson, 'EX', 604800); p.hset(`keystore:threads:${guildId}`, `cache:threads:${thread.id}`, 0); }); @@ -205,7 +206,7 @@ export default { (data.guild_scheduled_events as unknown[])[i] = undefined; if (rEvent) { const eventJson = JSON.stringify(rEvent); - redis.queueSync((p) => { + redis.queueSync((p: ChainableCommanderInterface) => { p.set(`cache:events:${event.id}:current`, eventJson, 'EX', 604800); p.hset(`keystore:events:${event.guild_id}`, `cache:events:${event.id}`, 0); }); diff --git a/src/BaseClient/Cluster/Redis.ts b/src/BaseClient/Cluster/Redis.ts index ccd9a9b..6ec15f6 100644 --- a/src/BaseClient/Cluster/Redis.ts +++ b/src/BaseClient/Cluster/Redis.ts @@ -1,11 +1,11 @@ -import { BunRedisWrapper } from '@ayako/utility'; +import { createRedisWrapper } from '@ayako/utility'; const cacheDBnum = process.argv.includes('--dev') ? process.env.devCacheDB : process.env.cacheDB; if (!cacheDBnum && typeof cacheDBnum !== 'number') { throw new Error('No cache DB number provided in env vars'); } -export const cacheDB = new BunRedisWrapper({ +export const cacheDB = createRedisWrapper({ host: process.argv.includes('--local') ? 'localhost' : 'redis', db: Number(cacheDBnum), }); From 0bb917ef504417c83c75c1be38fddff1bde3cdf9 Mon Sep 17 00:00:00 2001 From: Larsundso Date: Fri, 16 Jan 2026 15:05:55 +0100 Subject: [PATCH 10/21] refactor: streamline event emission in EventBus and update cache handling in CacheHandlers --- package.json | 2 +- src/BaseClient/Bot/CacheHandlers/index.ts | 6 +++--- src/Util/EventBus.ts | 18 +++++++++--------- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/package.json b/package.json index 2593c36..751bb82 100644 --- a/package.json +++ b/package.json @@ -7,7 +7,7 @@ "build": "swc src -d dist --strip-leading-paths --copy-files", "dev": "pnpm build && bun ./dist/index.js --debug --warn --dev --debug-db --local", "lint": "pnpx eslint 'src/**/*.ts' --fix", - "run": "bun ./dist/index.js --debug", + "run": "bun ./dist/index.js", "start": "pnpm run run", "watch": "swc src -d dist --strip-leading-paths --copy-files --watch" }, diff --git a/src/BaseClient/Bot/CacheHandlers/index.ts b/src/BaseClient/Bot/CacheHandlers/index.ts index 33a2a9e..8c460c4 100644 --- a/src/BaseClient/Bot/CacheHandlers/index.ts +++ b/src/BaseClient/Bot/CacheHandlers/index.ts @@ -47,14 +47,14 @@ export default (data: GatewayDispatchPayload, shardId: number) => { | Promise | unknown; if (res instanceof Promise) { - res.then(() => emit(data.t, data.d)).catch(() => emit(data.t, data.d)); + res.then(() => emit.call(redis, data.t, data.d)).catch(() => emit.call(redis, data.t, data.d)); } else { - emit(data.t, data.d); + emit.call(redis, data.t, data.d); } } catch (err) { // eslint-disable-next-line no-console console.error(`[CacheHandler] Error processing ${data.t}:`, err); - emit(data.t, data.d); + emit.call(redis, data.t, data.d); } }; diff --git a/src/Util/EventBus.ts b/src/Util/EventBus.ts index 644f81a..ae7b654 100644 --- a/src/Util/EventBus.ts +++ b/src/Util/EventBus.ts @@ -1,12 +1,12 @@ +import type { Cache } from '@ayako/utility'; import type { GatewayDispatchEvents, GatewayDispatchPayload } from 'discord-api-types/gateway/v10'; -import cache from '../BaseClient/Bot/Cache.js'; +export default function ( + this: Cache, + type: GatewayDispatchEvents, + data: GatewayDispatchPayload['d'], +) { + this.logger.debug(`[EventBus] Emitting event: ${type}`); -const emit = (type: GatewayDispatchEvents, data: GatewayDispatchPayload['d']) => { - // eslint-disable-next-line no-console - if (process.argv.includes('--debug')) console.log(`[EventBus] Emitting event: ${type}`); - - cache.cachePub.publish(type, JSON.stringify(data)); -}; - -export default emit; + this.cachePub.publish(type, JSON.stringify(data)); +} From dbf0795589ddebb28261e4403c1681e79135fee6 Mon Sep 17 00:00:00 2001 From: Lars_und_so <46791248+Larsundso@users.noreply.github.com> Date: Fri, 16 Jan 2026 20:56:22 +0100 Subject: [PATCH 11/21] refactor: replace fixed expiration time with random duration for cache entries in interaction handlers --- src/Util/firstChannelInteraction.ts | 12 +++++++++++- src/Util/firstGuildInteraction.ts | 12 ++++++++++-- src/Util/requestGuildMembers.ts | 11 ++++++++++- 3 files changed, 31 insertions(+), 4 deletions(-) diff --git a/src/Util/firstChannelInteraction.ts b/src/Util/firstChannelInteraction.ts index e60524b..b8cd540 100644 --- a/src/Util/firstChannelInteraction.ts +++ b/src/Util/firstChannelInteraction.ts @@ -1,3 +1,5 @@ +import { getRandom } from '@ayako/utility'; + import cache from '../BaseClient/Bot/Cache.js'; import requestChannelPins from './requestChannelPins.js'; @@ -8,7 +10,15 @@ export default async (channelId: string, guildId: string) => { const [isMember] = await cache.execPipeline<[string | null]>((pipeline) => { pipeline.hget('channel-interacts', channelId); pipeline.hset('channel-interacts', channelId, '1'); - pipeline.call('hexpire', 'channel-interacts', 604800, 'NX', 'FIELDS', 1, channelId); + pipeline.call( + 'hexpire', + 'channel-interacts', + getRandom(604800 / 2, 604800), + 'NX', + 'FIELDS', + 1, + channelId, + ); }); if (isMember === '1') return false; diff --git a/src/Util/firstGuildInteraction.ts b/src/Util/firstGuildInteraction.ts index f59ad20..7d87cb2 100644 --- a/src/Util/firstGuildInteraction.ts +++ b/src/Util/firstGuildInteraction.ts @@ -1,4 +1,4 @@ -import { getGuildPerms } from '@ayako/utility'; +import { getGuildPerms, getRandom } from '@ayako/utility'; import { GuildFeature, PermissionFlagsBits } from 'discord-api-types/v10'; import cache from '../BaseClient/Bot/Cache.js'; @@ -12,7 +12,15 @@ export default async (guildId: string) => { const [isMember] = await cache.execPipeline<[string | null]>((pipeline) => { pipeline.hget('guild-interacts', guildId); pipeline.hset('guild-interacts', guildId, '1'); - pipeline.call('hexpire', 'guild-interacts', 604800, 'NX', 'FIELDS', 1, guildId); + pipeline.call( + 'hexpire', + 'guild-interacts', + getRandom(604800 / 2, 604800), + 'NX', + 'FIELDS', + 1, + guildId, + ); }); if (isMember === '1') return false; diff --git a/src/Util/requestGuildMembers.ts b/src/Util/requestGuildMembers.ts index 4e148f7..ea47cc2 100644 --- a/src/Util/requestGuildMembers.ts +++ b/src/Util/requestGuildMembers.ts @@ -1,3 +1,4 @@ +import { getRandom } from '@ayako/utility'; import { GatewayOpcodes } from 'discord-api-types/gateway/v10'; import RedisCache from '../BaseClient/Bot/Cache.js'; @@ -16,7 +17,15 @@ const requestGuildMembers = async (guildId: string) => { const [isMember] = await RedisCache.execPipeline<[string | null]>((pipeline) => { pipeline.hget('guild-members-requested', guildId); pipeline.hset('guild-members-requested', guildId, '1'); - pipeline.call('hexpire', 'guild-members-requested', 604800, 'NX', 'FIELDS', 1, guildId); + pipeline.call( + 'hexpire', + 'guild-members-requested', + getRandom(604800 / 2, 604800), + 'NX', + 'FIELDS', + 1, + guildId, + ); }); if (isMember === '1') { From 040b7a04ff4295b7fa7386e14b39c8ca3e35c46b Mon Sep 17 00:00:00 2001 From: Lars_und_so <46791248+Larsundso@users.noreply.github.com> Date: Mon, 19 Jan 2026 11:08:49 +0100 Subject: [PATCH 12/21] feat: Implement Priority Queue system for managing Discord API requests --- src/BaseClient/Bot/CacheHandlers/Guilds.ts | 8 +- src/BaseClient/Bot/CacheHandlers/index.ts | 7 +- src/BaseClient/Bot/Client.ts | 12 - src/Util/PriorityQueue/BinaryHeap.ts | 148 ++++++ src/Util/PriorityQueue/GatewayQueue.ts | 148 ++++++ src/Util/PriorityQueue/RestQueue.ts | 537 +++++++++++++++++++++ src/Util/PriorityQueue/index.ts | 116 +++++ src/Util/PriorityQueue/types.ts | 70 +++ src/Util/firstChannelInteraction.ts | 16 +- src/Util/firstGuildInteraction.ts | 170 +------ src/Util/requestChannelPins.ts | 102 +--- src/Util/requestGuildMembers.ts | 68 +-- src/bot.ts | 7 +- 13 files changed, 1060 insertions(+), 349 deletions(-) create mode 100644 src/Util/PriorityQueue/BinaryHeap.ts create mode 100644 src/Util/PriorityQueue/GatewayQueue.ts create mode 100644 src/Util/PriorityQueue/RestQueue.ts create mode 100644 src/Util/PriorityQueue/index.ts create mode 100644 src/Util/PriorityQueue/types.ts diff --git a/src/BaseClient/Bot/CacheHandlers/Guilds.ts b/src/BaseClient/Bot/CacheHandlers/Guilds.ts index c500a6f..78c614f 100644 --- a/src/BaseClient/Bot/CacheHandlers/Guilds.ts +++ b/src/BaseClient/Bot/CacheHandlers/Guilds.ts @@ -28,7 +28,8 @@ import { type GuildMemberFlags, } from 'discord-api-types/v10'; -import firstGuildInteraction, { tasks } from '../../../Util/firstGuildInteraction.js'; +import firstGuildInteraction from '../../../Util/firstGuildInteraction.js'; +import { priorityQueue } from '../../../Util/PriorityQueue/index.js'; import redis from '../Cache.js'; import { cache } from '../Client.js'; @@ -360,7 +361,8 @@ export default { const success = await firstGuildInteraction(data.guild_id); if (success) return; - tasks.integrations(data.guild_id); + const memberCount = cache.members.get(data.guild_id) || 0; + priorityQueue.enqueueGuildTask(data.guild_id, memberCount, 'integrations'); }, [GatewayDispatchEvents.GuildMemberAdd]: async (data: GatewayGuildMemberAddDispatchData) => { @@ -384,7 +386,7 @@ export default { // eslint-disable-next-line no-console console.log('[Chunk] Finished receiving member chunks for', data.guild_id); - cache.requestingGuild = null; + priorityQueue.onMemberChunkComplete(data.guild_id); } if (data.chunk_index === 0) { diff --git a/src/BaseClient/Bot/CacheHandlers/index.ts b/src/BaseClient/Bot/CacheHandlers/index.ts index 8c460c4..78b4f1e 100644 --- a/src/BaseClient/Bot/CacheHandlers/index.ts +++ b/src/BaseClient/Bot/CacheHandlers/index.ts @@ -22,8 +22,10 @@ import { } from '../../../Typings/Channel.js'; import emit from '../../../Util/EventBus.js'; import firstChannelInteraction from '../../../Util/firstChannelInteraction.js'; -import firstGuildInteraction, { tasks } from '../../../Util/firstGuildInteraction.js'; +import firstGuildInteraction from '../../../Util/firstGuildInteraction.js'; +import { priorityQueue } from '../../../Util/PriorityQueue/index.js'; import redis from '../Cache.js'; +import { cache } from '../Client.js'; import ready from '../Events/ready.js'; import AutoModeration from './AutoModeration.js'; @@ -124,7 +126,8 @@ const caches: Record< }, [GatewayDispatchEvents.WebhooksUpdate]: (data: GatewayWebhooksUpdateDispatchData) => { - tasks.webhooks(data.guild_id); + const memberCount = cache.members.get(data.guild_id) || 0; + priorityQueue.enqueueGuildTask(data.guild_id, memberCount, 'webhooks'); }, [GatewayDispatchEvents.TypingStart]: async (data: GatewayTypingStartDispatchData) => { diff --git a/src/BaseClient/Bot/Client.ts b/src/BaseClient/Bot/Client.ts index 3c2984c..1328ddd 100644 --- a/src/BaseClient/Bot/Client.ts +++ b/src/BaseClient/Bot/Client.ts @@ -67,12 +67,6 @@ export const cache: { stickers: Map; sounds: Map; user: APIUser | null; - requestingGuild: string | null; - requestGuildQueue: Set; - requestingPins: string | null; - requestPinsQueue: Set; - requestPinsGuildMap: Map; - requestPinsPaused: boolean; } = { guilds: 0, members: new Map(), @@ -81,10 +75,4 @@ export const cache: { stickers: new Map(), sounds: new Map(), user: null, - requestingGuild: null, - requestGuildQueue: new Set(), - requestingPins: null, - requestPinsQueue: new Set(), - requestPinsGuildMap: new Map(), - requestPinsPaused: false, }; diff --git a/src/Util/PriorityQueue/BinaryHeap.ts b/src/Util/PriorityQueue/BinaryHeap.ts new file mode 100644 index 0000000..9b97930 --- /dev/null +++ b/src/Util/PriorityQueue/BinaryHeap.ts @@ -0,0 +1,148 @@ +/** + * Generic Binary Heap (Max-Heap by default) + * O(log n) insertion and extraction + */ +export class BinaryHeap { + private heap: T[] = []; + private comparator: (a: T, b: T) => number; + + /** + * Creates a new binary heap + * @param comparator Function that returns: + * - negative if a should come before b (higher priority) + * - positive if b should come before a (higher priority) + * - zero if equal priority + */ + constructor(comparator: (a: T, b: T) => number) { + this.comparator = comparator; + } + + /** + * Number of items in the heap + */ + get size(): number { + return this.heap.length; + } + + /** + * Check if heap is empty + */ + get isEmpty(): boolean { + return this.heap.length === 0; + } + + /** + * Insert an item into the heap + * O(log n) + */ + push(item: T): void { + this.heap.push(item); + this.bubbleUp(this.heap.length - 1); + } + + /** + * Remove and return the highest priority item + * O(log n) + */ + pop(): T | undefined { + if (this.heap.length === 0) return undefined; + if (this.heap.length === 1) return this.heap.pop(); + + const [top] = this.heap; + this.heap[0] = this.heap.pop()!; + this.bubbleDown(0); + return top; + } + + /** + * Peek at the highest priority item without removing it + * O(1) + */ + peek(): T | undefined { + return this.heap[0]; + } + + /** + * Clear all items from the heap + */ + clear(): void { + this.heap = []; + } + + /** + * Get all items (for debugging/inspection) + */ + toArray(): T[] { + return [...this.heap]; + } + + /** + * Remove an item that matches the predicate + * O(n) - use sparingly + */ + remove(predicate: (item: T) => boolean): T | undefined { + const index = this.heap.findIndex(predicate); + if (index === -1) return undefined; + + const item = this.heap[index]; + if (index === this.heap.length - 1) { + this.heap.pop(); + } else { + this.heap[index] = this.heap.pop()!; + // May need to bubble up or down depending on new item + const parent = Math.floor((index - 1) / 2); + if (index > 0 && this.comparator(this.heap[index], this.heap[parent]) < 0) { + this.bubbleUp(index); + } else { + this.bubbleDown(index); + } + } + return item; + } + + /** + * Check if an item matching predicate exists + * O(n) + */ + has(predicate: (item: T) => boolean): boolean { + return this.heap.some(predicate); + } + + /** + * Move item up the heap until heap property is satisfied + */ + private bubbleUp(index: number): void { + while (index > 0) { + const parent = Math.floor((index - 1) / 2); + if (this.comparator(this.heap[index], this.heap[parent]) >= 0) break; + + [this.heap[index], this.heap[parent]] = [this.heap[parent], this.heap[index]]; + index = parent; + } + } + + /** + * Move item down the heap until heap property is satisfied + */ + private bubbleDown(index: number): void { + const { length } = this.heap; + + while (true) { + const left = 2 * index + 1; + const right = 2 * index + 2; + let smallest = index; + + if (left < length && this.comparator(this.heap[left], this.heap[smallest]) < 0) { + smallest = left; + } + if (right < length && this.comparator(this.heap[right], this.heap[smallest]) < 0) { + smallest = right; + } + + if (smallest === index) break; + + [this.heap[index], this.heap[smallest]] = [this.heap[smallest], this.heap[index]]; + index = smallest; + } + } +} diff --git a/src/Util/PriorityQueue/GatewayQueue.ts b/src/Util/PriorityQueue/GatewayQueue.ts new file mode 100644 index 0000000..c116f97 --- /dev/null +++ b/src/Util/PriorityQueue/GatewayQueue.ts @@ -0,0 +1,148 @@ +/** + * Gateway Queue - Handles member chunk requests via Gateway opcode 8 + * + * - Single request at a time (must wait for all chunks) + * - Priority by member count (larger guilds first) + * - Notified when chunks complete via onChunkComplete() + */ +import { getRandom } from '@ayako/utility'; +import { GatewayOpcodes } from 'discord-api-types/gateway/v10'; + +import RedisCache from '../../BaseClient/Bot/Cache.js'; +import { gateway } from '../../BaseClient/Bot/Client.js'; +import calculateShardId from '../calculateShardId.js'; + +import { BinaryHeap } from './BinaryHeap.js'; +import { CONFIG, type GatewayQueueItem } from './types.js'; + +/** + * Priority comparator for gateway queue items + * Higher member count = higher priority (comes first) + * Equal member count: FIFO by addedAt + */ +const gatewayComparator = (a: GatewayQueueItem, b: GatewayQueueItem): number => { + if (a.memberCount !== b.memberCount) return b.memberCount - a.memberCount; + return a.addedAt - b.addedAt; +}; + +class GatewayQueue { + private queue = new BinaryHeap(gatewayComparator); + private currentGuildId: string | null = null; + private processingInterval: ReturnType | null = null; + private isProcessing = false; + + /** + * Get the currently processing guild ID + */ + get currentGuild(): string | null { + return this.currentGuildId; + } + + /** + * Get the number of items in the queue + */ + get size(): number { + return this.queue.size; + } + + /** + * Start the queue processing interval + */ + start(): void { + if (this.processingInterval) return; + this.processingInterval = setInterval(() => { + this.process().catch((err) => { + // eslint-disable-next-line no-console + console.error('[GatewayQueue] Error processing queue:', err); + }); + }, CONFIG.GATEWAY_INTERVAL); + // eslint-disable-next-line no-console + console.log('[GatewayQueue] Started'); + } + + /** + * Stop the queue processing interval + */ + stop(): void { + if (this.processingInterval) { + clearInterval(this.processingInterval); + this.processingInterval = null; + // eslint-disable-next-line no-console + console.log('[GatewayQueue] Stopped'); + } + } + + /** + * Add a guild to the queue for member chunk request + * @param guildId Guild ID + * @param memberCount Approximate member count for priority + */ + async enqueue(guildId: string, memberCount: number): Promise { + if (this.currentGuildId === guildId) return; + if (this.queue.has((item) => item.guildId === guildId)) return; + + const [alreadyRequested] = await RedisCache.execPipeline<[string | null]>((pipeline) => { + pipeline.hget('guild-members-requested', guildId); + pipeline.hset('guild-members-requested', guildId, '1'); + pipeline.call( + 'hexpire', + 'guild-members-requested', + getRandom(604800 / 2, 604800), + 'NX', + 'FIELDS', + 1, + guildId, + ); + }); + if (alreadyRequested === '1') return; + + this.queue.push({ + type: 'gateway', + guildId, + memberCount, + addedAt: Date.now(), + }); + } + + /** + * Called when all member chunks have been received for a guild + * This unblocks the queue to process the next guild + */ + onChunkComplete(guildId: string): void { + if (this.currentGuildId === guildId) { + this.currentGuildId = null; + } + } + + /** + * Process the next item in the queue + */ + private async process(): Promise { + if (this.isProcessing) return; + if (this.currentGuildId !== null) return; + if (this.queue.isEmpty) return; + + this.isProcessing = true; + + try { + const item = this.queue.pop(); + if (!item) return; + + this.currentGuildId = item.guildId; + + gateway.send(calculateShardId(item.guildId), { + op: GatewayOpcodes.RequestGuildMembers, + d: { guild_id: item.guildId, presences: false, limit: 0, query: '' }, + }); + + // eslint-disable-next-line no-console + console.log( + `[GatewayQueue] Requested members for ${item.guildId} (${item.memberCount} members) | Queue: ${this.queue.size}`, + ); + } finally { + this.isProcessing = false; + } + } +} + +export const gatewayQueue = new GatewayQueue(); diff --git a/src/Util/PriorityQueue/RestQueue.ts b/src/Util/PriorityQueue/RestQueue.ts new file mode 100644 index 0000000..bda9c94 --- /dev/null +++ b/src/Util/PriorityQueue/RestQueue.ts @@ -0,0 +1,537 @@ +/** + * REST Queue - Handles all Discord REST API calls with rate limiting + * + * - Max 5 concurrent requests (configurable) + * - Per-endpoint rate limit tracking + * - Re-queues on 429 with retry_after delay + * - Priority by member count (larger guilds first), then guild > channel + */ +import { getChannelPerms, getGuildPerms } from '@ayako/utility'; +import { GuildFeature, PermissionFlagsBits } from 'discord-api-types/v10'; + +import redis from '../../BaseClient/Bot/Cache.js'; +import { api, cache as clientCache } from '../../BaseClient/Bot/Client.js'; +import requestEventSubscribers from '../requestEventSubscribers.js'; +import requestVoiceChannelStatuses from '../requestVoiceChannelStatuses.js'; + +import { BinaryHeap } from './BinaryHeap.js'; +import { + CONFIG, + type ChannelTaskName, + type GuildTaskName, + type RateLimitState, + type RestQueueItem, +} from './types.js'; + +/** + * Priority comparator for REST queue items + * Higher member count = higher priority + * Equal member count: guild > channel + * Equal type: FIFO by addedAt + */ +const restComparator = (a: RestQueueItem, b: RestQueueItem): number => { + if (a.memberCount !== b.memberCount) return b.memberCount - a.memberCount; + if (a.type !== b.type) return a.type === 'guild' ? -1 : 1; + return a.addedAt - b.addedAt; +}; + +class RestQueue { + private queue = new BinaryHeap(restComparator); + private rateLimits = new Map(); + private activeRequests = 0; + private processingInterval: ReturnType | null = null; + private isProcessing = false; + + /** + * Get the number of items in the queue + */ + get size(): number { + return this.queue.size; + } + + /** + * Get the number of active requests + */ + get active(): number { + return this.activeRequests; + } + + /** + * Start the queue processing interval + */ + start(): void { + if (this.processingInterval) return; + this.processingInterval = setInterval(() => { + this.process().catch((err) => { + // eslint-disable-next-line no-console + console.error('[RestQueue] Error processing queue:', err); + }); + }, CONFIG.REST_INTERVAL); + // eslint-disable-next-line no-console + console.log('[RestQueue] Started'); + } + + /** + * Stop the queue processing interval + */ + stop(): void { + if (this.processingInterval) { + clearInterval(this.processingInterval); + this.processingInterval = null; + // eslint-disable-next-line no-console + console.log('[RestQueue] Stopped'); + } + } + + /** + * Enqueue all guild tasks for first guild interaction + */ + enqueueGuildTasks(guildId: string, memberCount: number): void { + const now = Date.now(); + const guildTasks: GuildTaskName[] = [ + 'vcStatus', + 'autoModRules', + 'commands', + 'commandPermissions', + 'welcomeScreen', + 'onboarding', + 'scheduledEvents', + 'webhooks', + 'integrations', + 'invites', + ]; + + for (const taskName of guildTasks) { + this.queue.push({ + type: 'guild', + id: guildId, + guildId, + memberCount, + taskName, + endpoint: `guilds/${guildId}/${taskName}`, + addedAt: now, + }); + } + } + + /** + * Enqueue a single guild task (for subsequent updates, not first interaction) + */ + enqueueGuildTask(guildId: string, memberCount: number, taskName: GuildTaskName): void { + if ( + this.queue.has( + (item) => item.type === 'guild' && item.guildId === guildId && item.taskName === taskName, + ) + ) { + return; + } + + this.queue.push({ + type: 'guild', + id: guildId, + guildId, + memberCount, + taskName, + endpoint: `guilds/${guildId}/${taskName}`, + addedAt: Date.now(), + }); + } + + /** + * Enqueue a channel task (e.g., pins) for first channel interaction + */ + enqueueChannelTask( + channelId: string, + guildId: string, + memberCount: number, + taskName: ChannelTaskName, + ): void { + if ( + this.queue.has( + (item) => item.type === 'channel' && item.id === channelId && item.taskName === taskName, + ) + ) { + return; + } + + this.queue.push({ + type: 'channel', + id: channelId, + guildId, + memberCount, + taskName, + endpoint: `channels/${channelId}/${taskName}`, + addedAt: Date.now(), + }); + } + + /** + * Process items from the queue + */ + private async process(): Promise { + if (this.isProcessing) return; + if (this.activeRequests >= CONFIG.REST_MAX_CONCURRENT) return; + if (this.queue.isEmpty) return; + + this.isProcessing = true; + + try { + this.cleanupRateLimits(); + + while (this.activeRequests < CONFIG.REST_MAX_CONCURRENT && !this.queue.isEmpty) { + const item = this.findNextUnblockedItem(); + if (!item) break; + + this.executeTask(item).catch((err) => { + // eslint-disable-next-line no-console + console.error(`[RestQueue] Task ${item.taskName} failed:`, err); + }); + } + } finally { + this.isProcessing = false; + } + } + + /** + * Find the next item that isn't rate limited + */ + private findNextUnblockedItem(): RestQueueItem | undefined { + const items = this.queue.toArray(); + + for (let i = 0; i < items.length; i++) { + const item = items[i]; + const rateLimit = this.rateLimits.get(item.endpoint); + + if (!rateLimit || !rateLimit.paused || Date.now() >= rateLimit.resetAt) { + this.queue.remove((it) => it === item); + return item; + } + } + + return undefined; + } + + /** + * Clean up expired rate limits + */ + private cleanupRateLimits(): void { + const now = Date.now(); + for (const [endpoint, state] of this.rateLimits) { + if (now >= state.resetAt) { + this.rateLimits.delete(endpoint); + } + } + } + + /** + * Handle rate limit (429) response + */ + private onRateLimit(item: RestQueueItem, retryAfter: number): void { + this.rateLimits.set(item.endpoint, { + endpoint: item.endpoint, + resetAt: Date.now() + retryAfter, + paused: true, + }); + + this.queue.push(item); + + // eslint-disable-next-line no-console + console.log( + `[RestQueue] Rate limited on ${item.endpoint}, retry after ${retryAfter}ms | Queue: ${this.queue.size}`, + ); + } + + /** + * Execute a task + */ + private async executeTask(item: RestQueueItem): Promise { + this.activeRequests++; + + try { + if (item.type === 'guild') { + await this.executeGuildTask(item); + } else { + await this.executeChannelTask(item); + } + } catch (error: unknown) { + if (this.isRateLimitError(error)) { + const retryAfter = this.getRateLimitRetryAfter(error); + this.onRateLimit(item, retryAfter); + } + } finally { + this.activeRequests--; + } + } + + /** + * Execute a guild task + */ + private async executeGuildTask(item: RestQueueItem): Promise { + const { guildId } = item; + + switch (item.taskName) { + case 'vcStatus': + await requestVoiceChannelStatuses(guildId); + break; + + case 'autoModRules': + await this.taskAutoModRules(guildId); + break; + + case 'commands': + await this.taskCommands(guildId); + break; + + case 'commandPermissions': + await this.taskCommandPermissions(guildId); + break; + + case 'welcomeScreen': + await this.taskWelcomeScreen(guildId); + break; + + case 'onboarding': + await this.taskOnboarding(guildId); + break; + + case 'scheduledEvents': + await this.taskScheduledEvents(guildId); + break; + + case 'webhooks': + await this.taskWebhooks(guildId); + break; + + case 'integrations': + await this.taskIntegrations(guildId); + break; + + case 'invites': + await this.taskInvites(guildId); + break; + + default: + break; + } + } + + /** + * Execute a channel task + */ + private async executeChannelTask(item: RestQueueItem): Promise { + if (item.taskName === 'pins') { + await this.taskPins(item.id, item.guildId); + } + } + + //#region Guild Tasks + + private async taskAutoModRules(guildId: string): Promise { + const perms = await getGuildPerms.call(redis, guildId, clientCache.user?.id || '0'); + if ((perms.response & PermissionFlagsBits.ManageGuild) !== PermissionFlagsBits.ManageGuild) { + return; + } + + const keystoreKey = redis.automods.keystore(guildId); + const keys = await redis.cacheDb.hkeys(keystoreKey); + if (keys.length > 0) await redis.cacheDb.del(...keys, keystoreKey); + const rules = await api.guilds.getAutoModerationRules(guildId).catch(() => []); + rules.forEach((r) => redis.automods.set(r)); + } + + private async taskCommands(guildId: string): Promise { + if (!clientCache.user) return; + + const keystoreKey = redis.commands.keystore(guildId); + const keys = await redis.cacheDb.hkeys(keystoreKey); + if (keys.length > 0) await redis.cacheDb.del(...keys, keystoreKey); + + const commands = await api.applicationCommands + .getGuildCommands(clientCache.user.id, guildId) + .catch(() => []); + commands.forEach((c) => redis.guildCommands.set({ ...c, guild_id: guildId })); + } + + private async taskCommandPermissions(guildId: string): Promise { + if (!clientCache.user) return; + + const keystoreKey = redis.commandPermissions.keystore(guildId); + const keys = await redis.cacheDb.hkeys(keystoreKey); + if (keys.length > 0) await redis.cacheDb.del(...keys, keystoreKey); + + const commandPerms = await api.applicationCommands + .getGuildCommandsPermissions(clientCache.user.id, guildId) + .catch(() => []); + + commandPerms.forEach((command) => + command.permissions.forEach((perm) => redis.commandPermissions.set(perm, guildId, command.id)), + ); + } + + private async taskWelcomeScreen(guildId: string): Promise { + const guild = await redis.guilds.get(guildId); + if (!guild) return; + + if (!guild.features.includes(GuildFeature.WelcomeScreenEnabled)) { + const perms = await getGuildPerms.call(redis, guildId, clientCache.user?.id || '0'); + if ((perms.response & PermissionFlagsBits.ManageGuild) !== PermissionFlagsBits.ManageGuild) { + return; + } + } + + const keystoreKey = redis.welcomeScreens.keystore(guildId); + const keys = await redis.cacheDb.hkeys(keystoreKey); + if (keys.length > 0) await redis.cacheDb.del(...keys, keystoreKey); + + const welcomeScreen = await api.guilds.getWelcomeScreen(guildId).catch(() => null); + if (!welcomeScreen) return; + + redis.welcomeScreens.set(welcomeScreen, guildId); + } + + private async taskOnboarding(guildId: string): Promise { + const guild = await redis.guilds.get(guildId); + if (!guild) return; + + const perms = await getGuildPerms.call(redis, guildId, clientCache.user?.id || '0'); + if ((perms.response & PermissionFlagsBits.ManageGuild) !== PermissionFlagsBits.ManageGuild) { + return; + } + + const onboarding = await api.guilds.getOnboarding(guildId); + redis.onboardings.set(onboarding); + } + + private async taskScheduledEvents(guildId: string): Promise { + const keystoreKey = redis.events.keystore(guildId); + const keys = await redis.cacheDb.hkeys(keystoreKey); + if (keys.length > 0) await redis.cacheDb.del(...keys, keystoreKey); + + const scheduledEvents = await api.guilds + .getScheduledEvents(guildId, { with_user_count: true }) + .catch(() => []); + scheduledEvents.forEach((e) => redis.events.set(e)); + + const members = ( + await Promise.all(scheduledEvents.map((e) => requestEventSubscribers(e))) + ).flat(); + + members.forEach((u) => { + redis.users.set(u.user); + redis.eventUsers.set( + { + guild_id: guildId, + guild_scheduled_event_id: u.guildScheduledEventId, + user: u.user, + user_id: u.user.id, + member: u.member, + }, + guildId, + ); + + if (u.member) redis.members.set(u.member, guildId); + }); + } + + private async taskWebhooks(guildId: string): Promise { + const perms = await getGuildPerms.call(redis, guildId, clientCache.user?.id || '0'); + if ( + (perms.response & PermissionFlagsBits.ManageWebhooks) !== + PermissionFlagsBits.ManageWebhooks + ) { + return; + } + + const keystoreKey = redis.webhooks.keystore(guildId); + const keys = await redis.cacheDb.hkeys(keystoreKey); + if (keys.length > 0) await redis.cacheDb.del(...keys, keystoreKey); + + const webhooks = await api.guilds.getWebhooks(guildId).catch(() => []); + webhooks.forEach((w) => redis.webhooks.set(w)); + } + + private async taskIntegrations(guildId: string): Promise { + const perms = await getGuildPerms.call(redis, guildId, clientCache.user?.id || '0'); + if ((perms.response & PermissionFlagsBits.ManageGuild) !== PermissionFlagsBits.ManageGuild) { + return; + } + + const keystoreKey = redis.integrations.keystore(guildId); + const keys = await redis.cacheDb.hkeys(keystoreKey); + if (keys.length > 0) await redis.cacheDb.del(...keys, keystoreKey); + + const integrations = await api.guilds.getIntegrations(guildId).catch(() => []); + integrations.forEach((i) => redis.integrations.set(i, guildId)); + } + + private async taskInvites(guildId: string): Promise { + const perms = await getGuildPerms.call(redis, guildId, clientCache.user?.id || '0'); + if ((perms.response & PermissionFlagsBits.ManageGuild) !== PermissionFlagsBits.ManageGuild) { + if ((perms.response & PermissionFlagsBits.ViewAuditLog) !== PermissionFlagsBits.ViewAuditLog) { + return; + } + } + + const keystoreKey = redis.invites.keystore(guildId); + const keys = await redis.cacheDb.hkeys(keystoreKey); + const guildCodestoreKey = redis.invites.codestore(guildId); + const globalCodestoreKey = redis.invites.codestore(); + + const codes = await redis.cacheDb.hkeys(guildCodestoreKey); + + if (keys.length > 0) await redis.cacheDb.del(...keys, keystoreKey, guildCodestoreKey); + if (codes.length > 0) await redis.cacheDb.hdel(globalCodestoreKey, ...codes); + + const invites = await api.guilds.getInvites(guildId).catch(() => []); + invites.forEach((i) => redis.invites.set(i)); + } + + //#region Channel Tasks + + private async taskPins(channelId: string, guildId: string): Promise { + const channelPerms = await getChannelPerms.call( + redis, + guildId, + clientCache.user?.id || '0', + channelId, + ); + const readPerms = PermissionFlagsBits.ViewAuditLog | PermissionFlagsBits.ReadMessageHistory; + if ((channelPerms.allow & readPerms) !== readPerms) return; + + await redis.pins.delAll(channelId); + + const pins = await api.channels.getPins(channelId); + + for (let i = 0; i < pins.length; i++) { + const pin = pins[i]; + redis.pins.set(channelId, pin.id); + redis.messages.set(pin, guildId); + (pins as unknown[])[i] = undefined; + } + pins.length = 0; + } + + //#region Utilities + + private isRateLimitError(error: unknown): boolean { + if (error && typeof error === 'object' && 'status' in error) { + return (error as { status: number }).status === 429; + } + return false; + } + + private getRateLimitRetryAfter(error: unknown): number { + if (error && typeof error === 'object') { + if ('rawError' in error) { + // eslint-disable-next-line @typescript-eslint/naming-convention + const { rawError } = error as { rawError: { retry_after?: number } }; + if (rawError?.retry_after) { + return Math.ceil(rawError.retry_after * 1000); + } + } + } + return CONFIG.DEFAULT_RETRY_AFTER; + } +} + +export const restQueue = new RestQueue(); diff --git a/src/Util/PriorityQueue/index.ts b/src/Util/PriorityQueue/index.ts new file mode 100644 index 0000000..e203365 --- /dev/null +++ b/src/Util/PriorityQueue/index.ts @@ -0,0 +1,116 @@ +/** + * Priority Queue System + * + * Provides throttled request handling to prevent Discord API rate limits + * after Redis FLUSHDB or cold starts. + * + * Usage: + * ```ts + * import { priorityQueue } from './PriorityQueue/index.js'; + * + * // Start on bot initialization + * priorityQueue.start(); + * + * // Enqueue requests + * priorityQueue.enqueueGuildTasks(guildId, memberCount); + * priorityQueue.enqueueMembers(guildId, memberCount); + * priorityQueue.enqueueChannelPins(channelId, guildId, memberCount); + * + * // Notify when member chunks complete + * priorityQueue.onMemberChunkComplete(guildId); + * ``` + */ + +import { gatewayQueue } from './GatewayQueue.js'; +import { restQueue } from './RestQueue.js'; +import type { GuildTaskName } from './types.js'; + +export * from './types.js'; +export { BinaryHeap } from './BinaryHeap.js'; +export { gatewayQueue } from './GatewayQueue.js'; +export { restQueue } from './RestQueue.js'; + +/** + * Unified interface for the priority queue system + */ +export const priorityQueue = { + /** + * Start all queue processors + */ + start(): void { + gatewayQueue.start(); + restQueue.start(); + }, + + /** + * Stop all queue processors + */ + stop(): void { + gatewayQueue.stop(); + restQueue.stop(); + }, + + /** + * Enqueue all guild tasks for first guild interaction + * @param guildId Guild ID + * @param memberCount Approximate member count for priority + */ + enqueueGuildTasks(guildId: string, memberCount: number): void { + restQueue.enqueueGuildTasks(guildId, memberCount); + }, + + /** + * Enqueue a single guild task (for subsequent updates, not first interaction) + * @param guildId Guild ID + * @param memberCount Approximate member count for priority + * @param taskName The specific task to enqueue + */ + enqueueGuildTask(guildId: string, memberCount: number, taskName: GuildTaskName): void { + restQueue.enqueueGuildTask(guildId, memberCount, taskName); + }, + + /** + * Enqueue member chunk request for a guild + * @param guildId Guild ID + * @param memberCount Approximate member count for priority + */ + async enqueueMembers(guildId: string, memberCount: number): Promise { + await gatewayQueue.enqueue(guildId, memberCount); + }, + + /** + * Enqueue channel pins request + * @param channelId Channel ID + * @param guildId Guild ID + * @param memberCount Guild member count for priority + */ + enqueueChannelPins(channelId: string, guildId: string, memberCount: number): void { + restQueue.enqueueChannelTask(channelId, guildId, memberCount, 'pins'); + }, + + /** + * Notify that member chunks have completed for a guild + * @param guildId Guild ID + */ + onMemberChunkComplete(guildId: string): void { + gatewayQueue.onChunkComplete(guildId); + }, + + /** + * Get current queue sizes (for monitoring) + */ + getStats(): { gatewayQueue: number; restQueue: number; activeRestRequests: number } { + return { + gatewayQueue: gatewayQueue.size, + restQueue: restQueue.size, + activeRestRequests: restQueue.active, + }; + }, + + /** + * Get the guild currently waiting for member chunks + */ + get currentMemberRequestGuild(): string | null { + return gatewayQueue.currentGuild; + }, +}; diff --git a/src/Util/PriorityQueue/types.ts b/src/Util/PriorityQueue/types.ts new file mode 100644 index 0000000..517fece --- /dev/null +++ b/src/Util/PriorityQueue/types.ts @@ -0,0 +1,70 @@ +/** + * Types for Priority Queue system + * Used to throttle Discord API requests after FLUSHDB + */ + +/** + * Item in the Gateway queue (member chunk requests) + */ +export type GatewayQueueItem = { + type: 'gateway'; + guildId: string; + memberCount: number; + addedAt: number; +}; + +/** + * Task names for REST API calls on first guild interaction + */ +export type GuildTaskName = + | 'autoModRules' + | 'commands' + | 'commandPermissions' + | 'welcomeScreen' + | 'onboarding' + | 'scheduledEvents' + | 'webhooks' + | 'integrations' + | 'invites' + | 'vcStatus'; + +/** + * Task names for REST API calls on first channel interaction + */ +export type ChannelTaskName = 'pins'; + +/** + * Item in the REST queue + */ +export type RestQueueItem = { + type: 'guild' | 'channel'; + id: string; + guildId: string; + memberCount: number; + taskName: GuildTaskName | ChannelTaskName; + endpoint: string; + addedAt: number; +}; + +/** + * Rate limit state for an endpoint + */ +export type RateLimitState = { + endpoint: string; + resetAt: number; + paused: boolean; +}; + +/** + * Queue configuration + */ +export const CONFIG = { + /** Milliseconds between gateway queue processing */ + GATEWAY_INTERVAL: 100, + /** Milliseconds between REST queue processing */ + REST_INTERVAL: 50, + /** Maximum concurrent REST requests */ + REST_MAX_CONCURRENT: 5, + /** Default retry after time in ms if not provided in 429 response */ + DEFAULT_RETRY_AFTER: 5000, +} as const; diff --git a/src/Util/firstChannelInteraction.ts b/src/Util/firstChannelInteraction.ts index b8cd540..e7282f8 100644 --- a/src/Util/firstChannelInteraction.ts +++ b/src/Util/firstChannelInteraction.ts @@ -1,8 +1,9 @@ import { getRandom } from '@ayako/utility'; import cache from '../BaseClient/Bot/Cache.js'; +import { cache as clientCache } from '../BaseClient/Bot/Client.js'; -import requestChannelPins from './requestChannelPins.js'; +import { priorityQueue } from './PriorityQueue/index.js'; export default async (channelId: string, guildId: string) => { if (!channelId) return false; @@ -22,13 +23,10 @@ export default async (channelId: string, guildId: string) => { }); if (isMember === '1') return false; - await Promise.allSettled(Object.values(tasks).map((t) => t(channelId, guildId))); - return true; -}; + if (!guildId) return false; -export const tasks = { - pins: async (channelId: string, guildId: string) => { - if (!guildId) return; - await requestChannelPins(channelId, guildId); - }, + const memberCount = clientCache.members.get(guildId) || 0; + priorityQueue.enqueueChannelPins(channelId, guildId, memberCount); + + return true; }; diff --git a/src/Util/firstGuildInteraction.ts b/src/Util/firstGuildInteraction.ts index 7d87cb2..2d8ea13 100644 --- a/src/Util/firstGuildInteraction.ts +++ b/src/Util/firstGuildInteraction.ts @@ -1,12 +1,9 @@ -import { getGuildPerms, getRandom } from '@ayako/utility'; -import { GuildFeature, PermissionFlagsBits } from 'discord-api-types/v10'; +import { getRandom } from '@ayako/utility'; import cache from '../BaseClient/Bot/Cache.js'; -import { api, cache as clientCache } from '../BaseClient/Bot/Client.js'; +import { cache as clientCache } from '../BaseClient/Bot/Client.js'; -import requestEventSubscribers from './requestEventSubscribers.js'; -import requestGuildMembers from './requestGuildMembers.js'; -import requestVoiceChannelStatuses from './requestVoiceChannelStatuses.js'; +import { priorityQueue } from './PriorityQueue/index.js'; export default async (guildId: string) => { const [isMember] = await cache.execPipeline<[string | null]>((pipeline) => { @@ -24,163 +21,10 @@ export default async (guildId: string) => { }); if (isMember === '1') return false; - await Promise.allSettled(Object.values(tasks).map((t) => t(guildId))); - return true; -}; - -export const tasks = { - vcStatus: (guildId: string) => requestVoiceChannelStatuses(guildId), - autoModRules: async (guildId: string) => { - const perms = await getGuildPerms.call(cache, guildId, clientCache.user?.id || '0'); - if ((perms.response & PermissionFlagsBits.ManageGuild) !== PermissionFlagsBits.ManageGuild) { - return; - } - - const keystoreKey = cache.automods.keystore(guildId); - const keys = await cache.cacheDb.hkeys(keystoreKey); - if (keys.length > 0) await cache.cacheDb.del(...keys, keystoreKey); - const rules = await api.guilds.getAutoModerationRules(guildId).catch(() => []); - rules.forEach((r) => cache.automods.set(r)); - }, - commands: async (guildId: string) => { - if (!clientCache.user) return; - - const keystoreKey = cache.commands.keystore(guildId); - const keys = await cache.cacheDb.hkeys(keystoreKey); - if (keys.length > 0) await cache.cacheDb.del(...keys, keystoreKey); - - const commands = await api.applicationCommands - .getGuildCommands(clientCache.user.id, guildId) - .catch(() => []); - commands.forEach((c) => cache.guildCommands.set({ ...c, guild_id: guildId })); - }, - members: async (guildId: string) => requestGuildMembers(guildId), - commandPermissions: async (guildId: string) => { - if (!clientCache.user) return; - - const keystoreKey = cache.commandPermissions.keystore(guildId); - const keys = await cache.cacheDb.hkeys(keystoreKey); - if (keys.length > 0) await cache.cacheDb.del(...keys, keystoreKey); - - const commandPerms = await api.applicationCommands - .getGuildCommandsPermissions(clientCache.user.id, guildId) - .catch(() => []); - - commandPerms.forEach((command) => - command.permissions.forEach((perm) => cache.commandPermissions.set(perm, guildId, command.id)), - ); - }, - - welcomeScreen: async (guildId: string) => { - const guild = await cache.guilds.get(guildId); - if (!guild) return; - - if (!guild.features.includes(GuildFeature.WelcomeScreenEnabled)) { - const perms = await getGuildPerms.call(cache, guildId, clientCache.user?.id || '0'); - if ((perms.response & PermissionFlagsBits.ManageGuild) !== PermissionFlagsBits.ManageGuild) { - return; - } - } - - const keystoreKey = cache.welcomeScreens.keystore(guildId); - const keys = await cache.cacheDb.hkeys(keystoreKey); - if (keys.length > 0) await cache.cacheDb.del(...keys, keystoreKey); - - const welcomeScreen = await api.guilds.getWelcomeScreen(guildId).catch(() => null); - if (!welcomeScreen) return; + const memberCount = clientCache.members.get(guildId) || 0; - cache.welcomeScreens.set(welcomeScreen, guildId); - }, - onboarding: async (guildId: string) => { - const guild = await cache.guilds.get(guildId); - if (!guild) return; + priorityQueue.enqueueGuildTasks(guildId, memberCount); + await priorityQueue.enqueueMembers(guildId, memberCount); - const perms = await getGuildPerms.call(cache, guildId, clientCache.user?.id || '0'); - if ((perms.response & PermissionFlagsBits.ManageGuild) !== PermissionFlagsBits.ManageGuild) { - return; - } - - const onboarding = await api.guilds.getOnboarding(guildId); - cache.onboardings.set(onboarding); - }, - scheduledEvents: async (guildId: string) => { - const keystoreKey = cache.events.keystore(guildId); - const keys = await cache.cacheDb.hkeys(keystoreKey); - if (keys.length > 0) await cache.cacheDb.del(...keys, keystoreKey); - - const scheduledEvents = await api.guilds - .getScheduledEvents(guildId, { with_user_count: true }) - .catch(() => []); - scheduledEvents.forEach((e) => cache.events.set(e)); - - const members = ( - await Promise.all(scheduledEvents.map((e) => requestEventSubscribers(e))) - ).flat(); - - members.forEach((u) => { - cache.users.set(u.user); - cache.eventUsers.set( - { - guild_id: guildId, - guild_scheduled_event_id: u.guildScheduledEventId, - user: u.user, - user_id: u.user.id, - member: u.member, - }, - guildId, - ); - - if (u.member) cache.members.set(u.member, guildId); - }); - }, - webhooks: async (guildId: string) => { - const perms = await getGuildPerms.call(cache, guildId, clientCache.user?.id || '0'); - if ( - (perms.response & PermissionFlagsBits.ManageWebhooks) !== - PermissionFlagsBits.ManageWebhooks - ) { - return; - } - - const keystoreKey = cache.webhooks.keystore(guildId); - const keys = await cache.cacheDb.hkeys(keystoreKey); - if (keys.length > 0) await cache.cacheDb.del(...keys, keystoreKey); - - const webhooks = await api.guilds.getWebhooks(guildId).catch(() => []); - webhooks.forEach((w) => cache.webhooks.set(w)); - }, - integrations: async (guildId: string) => { - const perms = await getGuildPerms.call(cache, guildId, clientCache.user?.id || '0'); - if ((perms.response & PermissionFlagsBits.ManageGuild) !== PermissionFlagsBits.ManageGuild) { - return; - } - - const keystoreKey = cache.integrations.keystore(guildId); - const keys = await cache.cacheDb.hkeys(keystoreKey); - if (keys.length > 0) await cache.cacheDb.del(...keys, keystoreKey); - - const integrations = await api.guilds.getIntegrations(guildId).catch(() => []); - integrations.forEach((i) => cache.integrations.set(i, guildId)); - }, - invites: async (guildId: string) => { - const perms = await getGuildPerms.call(cache, guildId, clientCache.user?.id || '0'); - if ((perms.response & PermissionFlagsBits.ManageGuild) !== PermissionFlagsBits.ManageGuild) { - if ((perms.response & PermissionFlagsBits.ViewAuditLog) !== PermissionFlagsBits.ViewAuditLog) { - return; - } - } - - const keystoreKey = cache.invites.keystore(guildId); - const keys = await cache.cacheDb.hkeys(keystoreKey); - const guildCodestoreKey = cache.invites.codestore(guildId); - const globalCodestoreKey = cache.invites.codestore(); - - const codes = await cache.cacheDb.hkeys(guildCodestoreKey); - - if (keys.length > 0) await cache.cacheDb.del(...keys, keystoreKey, guildCodestoreKey); - if (codes.length > 0) await cache.cacheDb.hdel(globalCodestoreKey, ...codes); - - const invites = await api.guilds.getInvites(guildId).catch(() => []); - invites.forEach((i) => cache.invites.set(i)); - }, + return true; }; diff --git a/src/Util/requestChannelPins.ts b/src/Util/requestChannelPins.ts index c576220..7937847 100644 --- a/src/Util/requestChannelPins.ts +++ b/src/Util/requestChannelPins.ts @@ -1,110 +1,20 @@ -import { getChannelPerms } from '@ayako/utility'; -import { PermissionFlagsBits } from 'discord-api-types/v10'; +import cache from '../BaseClient/Bot/Cache.js'; +import { cache as clientCache } from '../BaseClient/Bot/Client.js'; -import redis from '../BaseClient/Bot/Cache.js'; -import { api, cache } from '../BaseClient/Bot/Client.js'; +import { priorityQueue } from './PriorityQueue/index.js'; const requestChannelPins = async (channelId: string, guildId: string): Promise => { if (!channelId || !guildId) return; - if (cache.requestingPins === channelId) return; - if (cache.requestPinsQueue.has(channelId)) return; - - const [alreadyRequested] = await redis.execPipeline<[string | null]>((pipeline) => { + const [alreadyRequested] = await cache.execPipeline<[string | null]>((pipeline) => { pipeline.hget('channel-pins-requested', channelId); pipeline.hset('channel-pins-requested', channelId, '1'); pipeline.call('hexpire', 'channel-pins-requested', 300, 'NX', 'FIELDS', 1, channelId); }); if (alreadyRequested === '1') return; - cache.requestPinsQueue.add(channelId); - cache.requestPinsGuildMap.set(channelId, guildId); -}; - -const processPinsRequest = async (channelId: string): Promise => { - const guildId = cache.requestPinsGuildMap.get(channelId); - if (!guildId) return; - - const channelPerms = await getChannelPerms.call(redis, guildId, cache.user?.id || '0', channelId); - const readPerms = PermissionFlagsBits.ViewAuditLog | PermissionFlagsBits.ReadMessageHistory; - if ((channelPerms.allow & readPerms) !== readPerms) return; - - await redis.pins.delAll(channelId); - - try { - const pins = await api.channels.getPins(channelId); - - for (let i = 0; i < pins.length; i++) { - const pin = pins[i]; - redis.pins.set(channelId, pin.id); - redis.messages.set(pin, guildId); - (pins as unknown[])[i] = undefined; - } - pins.length = 0; - } catch (error: unknown) { - if (isRateLimitError(error)) { - const retryAfter = getRateLimitRetryAfter(error); - - cache.requestPinsQueue.add(channelId); - cache.requestPinsGuildMap.set(channelId, guildId); - - cache.requestPinsPaused = true; - setTimeout(() => { - cache.requestPinsPaused = false; - }, retryAfter); - } - } -}; - -const isRateLimitError = (error: unknown): boolean => { - if (error && typeof error === 'object' && 'status' in error) { - return (error as { status: number }).status === 429; - } - return false; + const memberCount = clientCache.members.get(guildId) || 0; + priorityQueue.enqueueChannelPins(channelId, guildId, memberCount); }; -const getRateLimitRetryAfter = (error: unknown): number => { - const DEFAULT_RETRY = 5000; - - if (error && typeof error === 'object') { - if ('rawError' in error) { - // eslint-disable-next-line @typescript-eslint/naming-convention - const { rawError } = error as { rawError: { retry_after?: number } }; - if (rawError?.retry_after) { - return Math.ceil(rawError.retry_after * 1000); - } - } - } - - return DEFAULT_RETRY; -}; - -let isProcessingPinsQueue = false; - -const processPinsQueue = async (): Promise => { - if (isProcessingPinsQueue) return; - if (cache.requestingPins) return; - if (cache.requestPinsPaused) return; - if (cache.requestPinsQueue.size === 0) return; - - const [nextChannelId] = cache.requestPinsQueue.values(); - if (!nextChannelId) return; - - isProcessingPinsQueue = true; - cache.requestPinsQueue.delete(nextChannelId); - cache.requestingPins = nextChannelId; - - try { - await processPinsRequest(nextChannelId); - } finally { - isProcessingPinsQueue = false; - cache.requestingPins = null; - cache.requestPinsGuildMap.delete(nextChannelId); - } -}; - -setInterval(() => { - processPinsQueue().catch(() => {}); -}, 500); - export default requestChannelPins; diff --git a/src/Util/requestGuildMembers.ts b/src/Util/requestGuildMembers.ts index ea47cc2..3f3ae86 100644 --- a/src/Util/requestGuildMembers.ts +++ b/src/Util/requestGuildMembers.ts @@ -1,68 +1,10 @@ -import { getRandom } from '@ayako/utility'; -import { GatewayOpcodes } from 'discord-api-types/gateway/v10'; +import { cache } from '../BaseClient/Bot/Client.js'; -import RedisCache from '../BaseClient/Bot/Cache.js'; -import { cache, gateway } from '../BaseClient/Bot/Client.js'; +import { priorityQueue } from './PriorityQueue/index.js'; -import calculateShardId from './calculateShardId.js'; - -const requestGuildMembers = async (guildId: string) => { - if (cache.requestingGuild !== guildId && cache.requestingGuild) { - cache.requestGuildQueue.add(guildId); - return; - } - - cache.requestingGuild = guildId; - - const [isMember] = await RedisCache.execPipeline<[string | null]>((pipeline) => { - pipeline.hget('guild-members-requested', guildId); - pipeline.hset('guild-members-requested', guildId, '1'); - pipeline.call( - 'hexpire', - 'guild-members-requested', - getRandom(604800 / 2, 604800), - 'NX', - 'FIELDS', - 1, - guildId, - ); - }); - - if (isMember === '1') { - cache.requestingGuild = null; - return; - } - - gateway.send(calculateShardId(guildId), { - op: GatewayOpcodes.RequestGuildMembers, - d: { guild_id: guildId, presences: false, limit: 0, query: '' }, - }); -}; - -let isProcessingGuildQueue = false; - -const processGuildQueue = async (): Promise => { - if (isProcessingGuildQueue) return; - if (cache.requestingGuild) return; - if (cache.requestGuildQueue.size === 0) return; - - isProcessingGuildQueue = true; - try { - const [nextGuild] = [...cache.requestGuildQueue.values()] - .map((id) => ({ id, members: cache.members.get(id) || 0 })) - .sort((a, b) => b.members - a.members); - - if (!nextGuild) return; - - cache.requestGuildQueue.delete(nextGuild.id); - await requestGuildMembers(nextGuild.id); - } finally { - isProcessingGuildQueue = false; - } +const requestGuildMembers = async (guildId: string): Promise => { + const memberCount = cache.members.get(guildId) || 0; + await priorityQueue.enqueueMembers(guildId, memberCount); }; -setInterval(() => { - processGuildQueue().catch(() => {}); -}, 100); - export default requestGuildMembers; diff --git a/src/bot.ts b/src/bot.ts index 96acaca..0b1896f 100644 --- a/src/bot.ts +++ b/src/bot.ts @@ -4,6 +4,8 @@ import 'dotenv/config'; import './BaseClient/Bot/Client.js'; import { getInfo } from 'discord-hybrid-sharding'; +import { priorityQueue } from './Util/PriorityQueue/index.js'; + if (process.argv.includes('--debug')) console.log('[Debug] Debug mode enabled'); if (process.argv.includes('--debug-db')) console.log('[Debug] Debug mode for database enabled'); if (process.argv.includes('--warn')) console.log('[Debug] Warn mode enabled'); @@ -14,8 +16,9 @@ const clusterId = getInfo().CLUSTER; setInterval(() => { const stats = heapStats(); const mem = process.memoryUsage(); + const queueStats = priorityQueue.getStats(); console.log( - `[Shard ${clusterId}] RSS: ${Math.round(mem.rss / 1024 / 1024)}MB | Heap: ${Math.round(stats.heapSize / 1024 / 1024)}MB | Objects: ${stats.objectCount}`, + `[Shard ${clusterId}] RSS: ${Math.round(mem.rss / 1024 / 1024)}MB | Heap: ${Math.round(stats.heapSize / 1024 / 1024)}MB | Objects: ${stats.objectCount} | GatewayQ: ${queueStats.gatewayQueue} | RestQ: ${queueStats.restQueue} (${queueStats.activeRestRequests} active)`, ); }, 30000); @@ -25,4 +28,6 @@ setInterval(() => { import('./BaseClient/Bot/Events/Process.js'), import('./BaseClient/Bot/Events/Rest.js'), ]); + + priorityQueue.start(); })(); From 4e2b320bd78cf727e159d80ca807b0ea498dc2a5 Mon Sep 17 00:00:00 2001 From: Lars_und_so <46791248+Larsundso@users.noreply.github.com> Date: Tue, 20 Jan 2026 00:47:34 +0100 Subject: [PATCH 13/21] refactor: Implement backpressure handling in RestQueue based on Redis queue size --- src/Util/PriorityQueue/RestQueue.ts | 16 ++++++++++++++++ src/Util/PriorityQueue/types.ts | 6 ++++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/src/Util/PriorityQueue/RestQueue.ts b/src/Util/PriorityQueue/RestQueue.ts index bda9c94..0087ba8 100644 --- a/src/Util/PriorityQueue/RestQueue.ts +++ b/src/Util/PriorityQueue/RestQueue.ts @@ -173,12 +173,28 @@ class RestQueue { if (this.activeRequests >= CONFIG.REST_MAX_CONCURRENT) return; if (this.queue.isEmpty) return; + const redisQueueSize = redis.cacheDb.getQueueSize(); + if (redisQueueSize > CONFIG.REDIS_QUEUE_THRESHOLD) { + // eslint-disable-next-line no-console + console.log( + `[RestQueue] Backpressure: Redis queue ${redisQueueSize} > threshold ${CONFIG.REDIS_QUEUE_THRESHOLD}, pausing`, + ); + return; + } + this.isProcessing = true; try { this.cleanupRateLimits(); while (this.activeRequests < CONFIG.REST_MAX_CONCURRENT && !this.queue.isEmpty) { + const currentQueueSize = redis.cacheDb.getQueueSize(); + if (currentQueueSize > CONFIG.REDIS_QUEUE_THRESHOLD) { + // eslint-disable-next-line no-console + console.log(`[RestQueue] Backpressure mid-loop: Redis queue ${currentQueueSize}, stopping`); + break; + } + const item = this.findNextUnblockedItem(); if (!item) break; diff --git a/src/Util/PriorityQueue/types.ts b/src/Util/PriorityQueue/types.ts index 517fece..25a5a62 100644 --- a/src/Util/PriorityQueue/types.ts +++ b/src/Util/PriorityQueue/types.ts @@ -62,9 +62,11 @@ export const CONFIG = { /** Milliseconds between gateway queue processing */ GATEWAY_INTERVAL: 100, /** Milliseconds between REST queue processing */ - REST_INTERVAL: 50, - /** Maximum concurrent REST requests */ + REST_INTERVAL: 100, + /** Maximum concurrent REST requests*/ REST_MAX_CONCURRENT: 5, /** Default retry after time in ms if not provided in 429 response */ DEFAULT_RETRY_AFTER: 5000, + /** Redis queue size threshold for backpressure - don't start new tasks if above this */ + REDIS_QUEUE_THRESHOLD: 10000, } as const; From 2bac5503fcd0143b63c6d76ca23ab14b1b44b449 Mon Sep 17 00:00:00 2001 From: Lars_und_so <46791248+Larsundso@users.noreply.github.com> Date: Tue, 20 Jan 2026 16:54:05 +0100 Subject: [PATCH 14/21] feat: Implement cold start detection for event emission control --- src/BaseClient/Bot/Client.ts | 2 + src/BaseClient/Bot/Events/ready.ts | 3 + src/Util/EventBus.ts | 7 ++ src/Util/PriorityQueue/ColdStartDetector.ts | 133 ++++++++++++++++++++ src/Util/PriorityQueue/index.ts | 17 +++ 5 files changed, 162 insertions(+) create mode 100644 src/Util/PriorityQueue/ColdStartDetector.ts diff --git a/src/BaseClient/Bot/Client.ts b/src/BaseClient/Bot/Client.ts index 1328ddd..51c37d8 100644 --- a/src/BaseClient/Bot/Client.ts +++ b/src/BaseClient/Bot/Client.ts @@ -60,6 +60,7 @@ export const client = new Client({ rest, gateway }); export const { api } = client; export const cluster = new ClusterClient(client); export const cache: { + approxGuilds: number; guilds: number; members: Map; emojis: Map; @@ -68,6 +69,7 @@ export const cache: { sounds: Map; user: APIUser | null; } = { + approxGuilds: await api.applications.getCurrent().then((app) => app.approximate_guild_count ?? 0), guilds: 0, members: new Map(), emojis: new Map(), diff --git a/src/BaseClient/Bot/Events/ready.ts b/src/BaseClient/Bot/Events/ready.ts index 9f683a8..3eff28e 100644 --- a/src/BaseClient/Bot/Events/ready.ts +++ b/src/BaseClient/Bot/Events/ready.ts @@ -8,6 +8,7 @@ import { import { getInfo } from 'discord-hybrid-sharding'; import { scheduleJob } from 'node-schedule'; +import { priorityQueue } from '../../../Util/PriorityQueue/index.js'; import redis from '../Cache.js'; import { cache, client, cluster, gateway } from '../Client.js'; @@ -27,6 +28,8 @@ export default async (data: GatewayReadyDispatchData, shardId: number | string) .SHARD_LIST.map((shard) => shard + 1) .join(', ')}`, ); + + priorityQueue.initializeColdStartDetection(); } console.log(`[Ready | Shard ${shardId}]`); diff --git a/src/Util/EventBus.ts b/src/Util/EventBus.ts index ae7b654..f2ee4f0 100644 --- a/src/Util/EventBus.ts +++ b/src/Util/EventBus.ts @@ -1,11 +1,18 @@ import type { Cache } from '@ayako/utility'; import type { GatewayDispatchEvents, GatewayDispatchPayload } from 'discord-api-types/gateway/v10'; +import { priorityQueue } from './PriorityQueue/index.js'; + export default function ( this: Cache, type: GatewayDispatchEvents, data: GatewayDispatchPayload['d'], ) { + if (priorityQueue.isColdStart) { + this.logger.debug(`[EventBus] Skipping event during cold start: ${type}`); + return; + } + this.logger.debug(`[EventBus] Emitting event: ${type}`); this.cachePub.publish(type, JSON.stringify(data)); diff --git a/src/Util/PriorityQueue/ColdStartDetector.ts b/src/Util/PriorityQueue/ColdStartDetector.ts new file mode 100644 index 0000000..e563d00 --- /dev/null +++ b/src/Util/PriorityQueue/ColdStartDetector.ts @@ -0,0 +1,133 @@ +/** + * Cold Start Detector + * + * Detects when the bot is in a "cold start" state after FLUSHDB or fresh deployment. + * During cold start, event emission to Redis is suppressed to prevent queue overload + * and events being sent with major delay due to paused queue. + * + * Cold start is detected when: + * - guild-interacts, channel-interacts, hashes + * have fewer than 10% of the bot's guild count entries + * + * Cold start ends when: + * - Redis queue size drops below 5000 + */ +import redis from '../../BaseClient/Bot/Cache.js'; +import { cache as clientCache } from '../../BaseClient/Bot/Client.js'; + +class ColdStartDetector { + private isColdstart = false; + private isInitialized = false; + private checkInterval: ReturnType | null = null; + + protected coldStartThresholdPercentage = 0.1; + protected coldStartEndQueueSize = 5000; + protected checkIntervalTime = 1000; + + /** + * Whether the bot is currently in cold start mode + */ + get isColdStart(): boolean { + return this.isColdstart; + } + + /** + * Initialize cold start detection + * Call this after the bot has received guild count from Discord + */ + async initialize(): Promise { + if (this.isInitialized) return; + this.isInitialized = true; + + const guildCount = clientCache.approxGuilds; + if (guildCount === 0) { + this.isColdstart = true; + // eslint-disable-next-line no-console + console.log('[ColdStart] Guild count is 0, assuming cold start'); + this.startEndCheck(); + return; + } + + const threshold = Math.floor(guildCount * this.coldStartThresholdPercentage); + + const guildInteractsSize = (await redis.cacheDb.call('HLEN', 'guild-interacts')) as number; + const isCold = guildInteractsSize < threshold; + + if (isCold) { + this.isColdstart = true; + // eslint-disable-next-line no-console + console.log( + `[ColdStart] Detected cold start | Guilds: ${guildCount} | Threshold: ${threshold} | ` + + `guild-interacts: ${guildInteractsSize} | `, + ); + + this.startEndCheck(); + } else { + // eslint-disable-next-line no-console + console.log( + `[ColdStart] Normal start detected | Guilds: ${guildCount} | ` + + `guild-interacts: ${guildInteractsSize} | `, + ); + } + } + + /** + * Start checking if cold start has ended + */ + private startEndCheck(): void { + if (this.checkInterval) return; + + this.checkInterval = setInterval(() => { + this.checkColdStartEnd(); + }, this.checkIntervalTime); + } + + /** + * Check if cold start should end based on Redis queue size + */ + private checkColdStartEnd(): void { + const queueSize = redis.cacheDb.getQueueSize(); + + if (queueSize < this.coldStartEndQueueSize) { + this.isColdstart = false; + + if (this.checkInterval) { + clearInterval(this.checkInterval); + this.checkInterval = null; + } + + // eslint-disable-next-line no-console + console.log(`[ColdStart] Cold start ended | Redis queue size: ${queueSize}`); + } + } + + /** + * Manually end cold start (for testing or manual override) + */ + endColdStart(): void { + this.isColdstart = false; + + if (this.checkInterval) { + clearInterval(this.checkInterval); + this.checkInterval = null; + } + + // eslint-disable-next-line no-console + console.log('[ColdStart] Cold start manually ended'); + } + + /** + * Reset detector state (for testing) + */ + reset(): void { + this.isColdstart = false; + this.isInitialized = false; + + if (this.checkInterval) { + clearInterval(this.checkInterval); + this.checkInterval = null; + } + } +} + +export const coldStartDetector = new ColdStartDetector(); diff --git a/src/Util/PriorityQueue/index.ts b/src/Util/PriorityQueue/index.ts index e203365..78ecc82 100644 --- a/src/Util/PriorityQueue/index.ts +++ b/src/Util/PriorityQueue/index.ts @@ -21,12 +21,14 @@ * ``` */ +import { coldStartDetector } from './ColdStartDetector.js'; import { gatewayQueue } from './GatewayQueue.js'; import { restQueue } from './RestQueue.js'; import type { GuildTaskName } from './types.js'; export * from './types.js'; export { BinaryHeap } from './BinaryHeap.js'; +export { coldStartDetector } from './ColdStartDetector.js'; export { gatewayQueue } from './GatewayQueue.js'; export { restQueue } from './RestQueue.js'; @@ -113,4 +115,19 @@ export const priorityQueue = { get currentMemberRequestGuild(): string | null { return gatewayQueue.currentGuild; }, + + /** + * Whether the bot is in cold start mode (events should not be emitted) + */ + get isColdStart(): boolean { + return coldStartDetector.isColdStart; + }, + + /** + * Initialize cold start detection + * Call after guild count is known + */ + async initializeColdStartDetection(): Promise { + await coldStartDetector.initialize(); + }, }; From cb49818eb54f8c770772364197d05e77b4e511d9 Mon Sep 17 00:00:00 2001 From: Lars_und_so <46791248+Larsundso@users.noreply.github.com> Date: Tue, 20 Jan 2026 17:45:36 +0100 Subject: [PATCH 15/21] refactor: Enhance GatewayQueue with chunk timeout handling and skip empty guilds --- src/Util/PriorityQueue/GatewayQueue.ts | 28 ++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/src/Util/PriorityQueue/GatewayQueue.ts b/src/Util/PriorityQueue/GatewayQueue.ts index c116f97..09264a9 100644 --- a/src/Util/PriorityQueue/GatewayQueue.ts +++ b/src/Util/PriorityQueue/GatewayQueue.ts @@ -30,6 +30,8 @@ class GatewayQueue { private currentGuildId: string | null = null; private processingInterval: ReturnType | null = null; private isProcessing = false; + private chunkTimeout: ReturnType | null = null; + private static readonly chunkTimeoutMS = 30000; /** * Get the currently processing guild ID @@ -67,9 +69,15 @@ class GatewayQueue { if (this.processingInterval) { clearInterval(this.processingInterval); this.processingInterval = null; - // eslint-disable-next-line no-console - console.log('[GatewayQueue] Stopped'); } + + if (this.chunkTimeout) { + clearTimeout(this.chunkTimeout); + this.chunkTimeout = null; + } + + // eslint-disable-next-line no-console + console.log('[GatewayQueue] Stopped'); } /** @@ -78,6 +86,8 @@ class GatewayQueue { * @param memberCount Approximate member count for priority */ async enqueue(guildId: string, memberCount: number): Promise { + if (memberCount === 0) return; + if (this.currentGuildId === guildId) return; if (this.queue.has((item) => item.guildId === guildId)) return; @@ -111,6 +121,11 @@ class GatewayQueue { onChunkComplete(guildId: string): void { if (this.currentGuildId === guildId) { this.currentGuildId = null; + + if (this.chunkTimeout) { + clearTimeout(this.chunkTimeout); + this.chunkTimeout = null; + } } } @@ -135,6 +150,15 @@ class GatewayQueue { d: { guild_id: item.guildId, presences: false, limit: 0, query: '' }, }); + this.chunkTimeout = setTimeout(() => { + if (this.currentGuildId === item.guildId) { + // eslint-disable-next-line no-console + console.log(`[GatewayQueue] Timeout waiting for chunks from ${item.guildId}, skipping`); + this.currentGuildId = null; + this.chunkTimeout = null; + } + }, GatewayQueue.chunkTimeoutMS); + // eslint-disable-next-line no-console console.log( `[GatewayQueue] Requested members for ${item.guildId} (${item.memberCount} members) | Queue: ${this.queue.size}`, From fd24852e2fd5e24cf443b75275bde39ef1fccfa8 Mon Sep 17 00:00:00 2001 From: Lars_und_so <46791248+Larsundso@users.noreply.github.com> Date: Tue, 20 Jan 2026 17:45:44 +0100 Subject: [PATCH 16/21] feat: Add completed request count logging to RestQueue --- src/Util/PriorityQueue/RestQueue.ts | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/Util/PriorityQueue/RestQueue.ts b/src/Util/PriorityQueue/RestQueue.ts index 0087ba8..476777c 100644 --- a/src/Util/PriorityQueue/RestQueue.ts +++ b/src/Util/PriorityQueue/RestQueue.ts @@ -41,6 +41,7 @@ class RestQueue { private activeRequests = 0; private processingInterval: ReturnType | null = null; private isProcessing = false; + private completedCount = 0; /** * Get the number of items in the queue @@ -87,6 +88,8 @@ class RestQueue { * Enqueue all guild tasks for first guild interaction */ enqueueGuildTasks(guildId: string, memberCount: number): void { + if (this.queue.has((item) => item.type === 'guild' && item.guildId === guildId)) return; + const now = Date.now(); const guildTasks: GuildTaskName[] = [ 'vcStatus', @@ -269,6 +272,14 @@ class RestQueue { } else { await this.executeChannelTask(item); } + + this.completedCount++; + if (this.completedCount % 10 === 0) { + // eslint-disable-next-line no-console + console.log( + `[RestQueue] Completed ${this.completedCount} requests | Queue: ${this.queue.size} | Active: ${this.activeRequests}`, + ); + } } catch (error: unknown) { if (this.isRateLimitError(error)) { const retryAfter = this.getRateLimitRetryAfter(error); From c1191317ababa054b06200ace623d8dc0cfa6ac2 Mon Sep 17 00:00:00 2001 From: Lars_und_so <46791248+Larsundso@users.noreply.github.com> Date: Tue, 20 Jan 2026 18:42:24 +0100 Subject: [PATCH 17/21] feat: Implement in-flight task tracking for REST queue --- src/Util/PriorityQueue/RestQueue.ts | 55 +++++++++++++++++++++-------- 1 file changed, 40 insertions(+), 15 deletions(-) diff --git a/src/Util/PriorityQueue/RestQueue.ts b/src/Util/PriorityQueue/RestQueue.ts index 476777c..4298ed1 100644 --- a/src/Util/PriorityQueue/RestQueue.ts +++ b/src/Util/PriorityQueue/RestQueue.ts @@ -42,6 +42,7 @@ class RestQueue { private processingInterval: ReturnType | null = null; private isProcessing = false; private completedCount = 0; + private inFlight = new Set(); /** * Get the number of items in the queue @@ -84,11 +85,44 @@ class RestQueue { } } + /** + * Generate a unique key for a task (used for in-flight tracking) + */ + private getTaskKey(type: 'guild' | 'channel', id: string, taskName: string): string { + return `${type}:${id}:${taskName}`; + } + + /** + * Check if a guild task is already queued or in-flight + */ + private hasGuildTask(guildId: string, taskName?: string): boolean { + if (taskName) { + if (this.inFlight.has(this.getTaskKey('guild', guildId, taskName))) return true; + } else { + for (const key of this.inFlight) { + if (key.startsWith(`guild:${guildId}:`)) return true; + } + } + return this.queue.has((item) => + item.type === 'guild' && item.guildId === guildId && (taskName ? item.taskName === taskName : true), + ); + } + + /** + * Check if a channel task is already queued or in-flight + */ + private hasChannelTask(channelId: string, taskName: string): boolean { + if (this.inFlight.has(this.getTaskKey('channel', channelId, taskName))) return true; + return this.queue.has( + (item) => item.type === 'channel' && item.id === channelId && item.taskName === taskName, + ); + } + /** * Enqueue all guild tasks for first guild interaction */ enqueueGuildTasks(guildId: string, memberCount: number): void { - if (this.queue.has((item) => item.type === 'guild' && item.guildId === guildId)) return; + if (this.hasGuildTask(guildId)) return; const now = Date.now(); const guildTasks: GuildTaskName[] = [ @@ -121,13 +155,7 @@ class RestQueue { * Enqueue a single guild task (for subsequent updates, not first interaction) */ enqueueGuildTask(guildId: string, memberCount: number, taskName: GuildTaskName): void { - if ( - this.queue.has( - (item) => item.type === 'guild' && item.guildId === guildId && item.taskName === taskName, - ) - ) { - return; - } + if (this.hasGuildTask(guildId, taskName)) return; this.queue.push({ type: 'guild', @@ -149,13 +177,7 @@ class RestQueue { memberCount: number, taskName: ChannelTaskName, ): void { - if ( - this.queue.has( - (item) => item.type === 'channel' && item.id === channelId && item.taskName === taskName, - ) - ) { - return; - } + if (this.hasChannelTask(channelId, taskName)) return; this.queue.push({ type: 'channel', @@ -264,6 +286,8 @@ class RestQueue { * Execute a task */ private async executeTask(item: RestQueueItem): Promise { + const taskKey = this.getTaskKey(item.type, item.id, item.taskName); + this.inFlight.add(taskKey); this.activeRequests++; try { @@ -286,6 +310,7 @@ class RestQueue { this.onRateLimit(item, retryAfter); } } finally { + this.inFlight.delete(taskKey); this.activeRequests--; } } From 45016c2b967ff84fcb9fd2fca32e3447bb759839 Mon Sep 17 00:00:00 2001 From: Lars_und_so <46791248+Larsundso@users.noreply.github.com> Date: Tue, 20 Jan 2026 20:37:18 +0100 Subject: [PATCH 18/21] feat: Add logging for enqueued guild and channel tasks in RestQueue --- src/Util/PriorityQueue/RestQueue.ts | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/Util/PriorityQueue/RestQueue.ts b/src/Util/PriorityQueue/RestQueue.ts index 4298ed1..3a3ce85 100644 --- a/src/Util/PriorityQueue/RestQueue.ts +++ b/src/Util/PriorityQueue/RestQueue.ts @@ -149,6 +149,9 @@ class RestQueue { addedAt: now, }); } + + // eslint-disable-next-line no-console + console.log(`[RestQueue] Enqueued ${guildTasks.length} guild tasks for guilds/${guildId}/* | Queue: ${this.queue.size}`); } /** @@ -157,15 +160,19 @@ class RestQueue { enqueueGuildTask(guildId: string, memberCount: number, taskName: GuildTaskName): void { if (this.hasGuildTask(guildId, taskName)) return; + const endpoint = `guilds/${guildId}/${taskName}`; this.queue.push({ type: 'guild', id: guildId, guildId, memberCount, taskName, - endpoint: `guilds/${guildId}/${taskName}`, + endpoint, addedAt: Date.now(), }); + + // eslint-disable-next-line no-console + console.log(`[RestQueue] Enqueued ${endpoint} | Queue: ${this.queue.size}`); } /** @@ -179,15 +186,19 @@ class RestQueue { ): void { if (this.hasChannelTask(channelId, taskName)) return; + const endpoint = `channels/${channelId}/${taskName}`; this.queue.push({ type: 'channel', id: channelId, guildId, memberCount, taskName, - endpoint: `channels/${channelId}/${taskName}`, + endpoint, addedAt: Date.now(), }); + + // eslint-disable-next-line no-console + console.log(`[RestQueue] Enqueued ${endpoint} | Queue: ${this.queue.size}`); } /** From 518a421263cdcd051c2fb0a8ef5015e9ebdbc04c Mon Sep 17 00:00:00 2001 From: Lars_und_so <46791248+Larsundso@users.noreply.github.com> Date: Wed, 21 Jan 2026 01:23:13 +0100 Subject: [PATCH 19/21] feat: Implement bucket-based rate limiting in RestQueue --- src/Util/PriorityQueue/RestQueue.ts | 142 ++++++++++++++++++++++------ src/Util/PriorityQueue/types.ts | 22 ++++- 2 files changed, 133 insertions(+), 31 deletions(-) diff --git a/src/Util/PriorityQueue/RestQueue.ts b/src/Util/PriorityQueue/RestQueue.ts index 3a3ce85..c3528d7 100644 --- a/src/Util/PriorityQueue/RestQueue.ts +++ b/src/Util/PriorityQueue/RestQueue.ts @@ -7,6 +7,7 @@ * - Priority by member count (larger guilds first), then guild > channel */ import { getChannelPerms, getGuildPerms } from '@ayako/utility'; +import { RESTEvents, type RateLimitData } from '@discordjs/rest'; import { GuildFeature, PermissionFlagsBits } from 'discord-api-types/v10'; import redis from '../../BaseClient/Bot/Cache.js'; @@ -17,9 +18,9 @@ import requestVoiceChannelStatuses from '../requestVoiceChannelStatuses.js'; import { BinaryHeap } from './BinaryHeap.js'; import { CONFIG, + type BucketState, type ChannelTaskName, type GuildTaskName, - type RateLimitState, type RestQueueItem, } from './types.js'; @@ -37,13 +38,15 @@ const restComparator = (a: RestQueueItem, b: RestQueueItem): number => { class RestQueue { private queue = new BinaryHeap(restComparator); - private rateLimits = new Map(); private activeRequests = 0; private processingInterval: ReturnType | null = null; private isProcessing = false; private completedCount = 0; private inFlight = new Set(); + private buckets = new Map(); + private routeToBucket = new Map(); + /** * Get the number of items in the queue */ @@ -63,6 +66,9 @@ class RestQueue { */ start(): void { if (this.processingInterval) return; + + this.setupRateLimitListener(); + this.processingInterval = setInterval(() => { this.process().catch((err) => { // eslint-disable-next-line no-console @@ -118,6 +124,109 @@ class RestQueue { ); } + //#region Bucket-Based Rate Limiting + + /** + * Normalize an endpoint to a route pattern for bucket grouping + * Converts: "channels/123456789012345678/pins" -> "channels/:id/pins" + */ + private normalizeRoute(endpoint: string): string { + return endpoint + .replace(/\d{17,19}/g, ':id') + .replace(/\/reactions\/(.*)/, '/reactions/:reaction') + .replace(/\/webhooks\/:id\/[^/?]+/, '/webhooks/:id/:token'); + } + + /** + * Generate a bucket key combining method and normalized route + */ + private generateBucketKey(method: string, endpoint: string): string { + return `${method}:${this.normalizeRoute(endpoint)}`; + } + + /** + * Check if an endpoint is currently rate limited based on bucket + */ + private isEndpointBlocked(method: string, endpoint: string): boolean { + const bucketKey = this.generateBucketKey(method, endpoint); + const bucketHash = this.routeToBucket.get(bucketKey); + + if (!bucketHash) return false; // Unknown bucket - allow request + + const bucket = this.buckets.get(bucketHash); + if (!bucket) return false; + + const now = Date.now(); + + // Check if bucket has reset + if (now >= bucket.resetAt) { + bucket.blocked = false; + bucket.remaining = bucket.limit; + return false; + } + + return bucket.blocked; + } + + /** + * Update bucket state from RateLimitData event + */ + private updateBucketFromEvent(data: RateLimitData): void { + const bucketKey = this.generateBucketKey(data.method, data.route); + this.routeToBucket.set(bucketKey, data.hash); + + this.buckets.set(data.hash, { + bucketHash: data.hash, + route: data.route, + method: data.method, + remaining: 0, + limit: data.limit, + resetAt: Date.now() + data.timeToReset, + blocked: true, + scope: data.scope as 'user' | 'global' | 'shared', + }); + + // eslint-disable-next-line no-console + console.log( + `[RestQueue] Bucket learned: ${data.route} -> ${data.hash} (resets in ${data.timeToReset}ms)`, + ); + } + + /** + * Update bucket state when we hit a 429 (fallback if event doesn't fire) + */ + private updateBucketFromError(endpoint: string, retryAfter: number): void { + const route = this.normalizeRoute(endpoint); + const bucketKey = this.generateBucketKey('GET', endpoint); + + const existingHash = this.routeToBucket.get(bucketKey); + const bucketHash = existingHash ?? `unknown:${route}`; + + this.routeToBucket.set(bucketKey, bucketHash); + + this.buckets.set(bucketHash, { + bucketHash, + route, + method: 'GET', + remaining: 0, + limit: 1, + resetAt: Date.now() + retryAfter, + blocked: true, + scope: 'user', + }); + } + + /** + * Setup listener for REST rate limit events + */ + private setupRateLimitListener(): void { + api.rest.on(RESTEvents.RateLimited, (data: RateLimitData) => { + this.updateBucketFromEvent(data); + }); + } + + //#endregion + /** * Enqueue all guild tasks for first guild interaction */ @@ -221,8 +330,6 @@ class RestQueue { this.isProcessing = true; try { - this.cleanupRateLimits(); - while (this.activeRequests < CONFIG.REST_MAX_CONCURRENT && !this.queue.isEmpty) { const currentQueueSize = redis.cacheDb.getQueueSize(); if (currentQueueSize > CONFIG.REDIS_QUEUE_THRESHOLD) { @@ -245,16 +352,13 @@ class RestQueue { } /** - * Find the next item that isn't rate limited + * Find the next item that isn't rate limited (using bucket-based checking) */ private findNextUnblockedItem(): RestQueueItem | undefined { const items = this.queue.toArray(); - for (let i = 0; i < items.length; i++) { - const item = items[i]; - const rateLimit = this.rateLimits.get(item.endpoint); - - if (!rateLimit || !rateLimit.paused || Date.now() >= rateLimit.resetAt) { + for (const item of items) { + if (!this.isEndpointBlocked('GET', item.endpoint)) { this.queue.remove((it) => it === item); return item; } @@ -263,27 +367,11 @@ class RestQueue { return undefined; } - /** - * Clean up expired rate limits - */ - private cleanupRateLimits(): void { - const now = Date.now(); - for (const [endpoint, state] of this.rateLimits) { - if (now >= state.resetAt) { - this.rateLimits.delete(endpoint); - } - } - } - /** * Handle rate limit (429) response */ private onRateLimit(item: RestQueueItem, retryAfter: number): void { - this.rateLimits.set(item.endpoint, { - endpoint: item.endpoint, - resetAt: Date.now() + retryAfter, - paused: true, - }); + this.updateBucketFromError(item.endpoint, retryAfter); this.queue.push(item); diff --git a/src/Util/PriorityQueue/types.ts b/src/Util/PriorityQueue/types.ts index 25a5a62..35352b9 100644 --- a/src/Util/PriorityQueue/types.ts +++ b/src/Util/PriorityQueue/types.ts @@ -47,12 +47,26 @@ export type RestQueueItem = { }; /** - * Rate limit state for an endpoint + * Rate limit bucket from Discord + * Multiple routes can share the same bucket (identified by bucketHash) */ -export type RateLimitState = { - endpoint: string; +export type BucketState = { + /** The bucket hash from X-RateLimit-Bucket header, or pseudo-hash for unknown buckets */ + bucketHash: string; + /** Normalized route pattern (e.g., "channels/:id/pins") */ + route: string; + /** HTTP method */ + method: string; + /** Number of requests remaining before rate limit */ + remaining: number; + /** Total requests allowed per window */ + limit: number; + /** Timestamp when the bucket resets (ms since epoch) */ resetAt: number; - paused: boolean; + /** Whether this bucket is currently blocked */ + blocked: boolean; + /** Rate limit scope: 'user', 'global', or 'shared' */ + scope: 'user' | 'global' | 'shared'; }; /** From 4b1f5d3fc1b063059117b7873114fbf45546e187 Mon Sep 17 00:00:00 2001 From: Lars_und_so <46791248+Larsundso@users.noreply.github.com> Date: Wed, 21 Jan 2026 15:37:30 +0100 Subject: [PATCH 20/21] feat: Implement task execution timeout to prevent hanging tasks in RestQueue --- src/Util/PriorityQueue/RestQueue.ts | 119 +++++++++++++--------------- src/Util/PriorityQueue/types.ts | 2 + 2 files changed, 57 insertions(+), 64 deletions(-) diff --git a/src/Util/PriorityQueue/RestQueue.ts b/src/Util/PriorityQueue/RestQueue.ts index c3528d7..6c1b5d9 100644 --- a/src/Util/PriorityQueue/RestQueue.ts +++ b/src/Util/PriorityQueue/RestQueue.ts @@ -7,7 +7,7 @@ * - Priority by member count (larger guilds first), then guild > channel */ import { getChannelPerms, getGuildPerms } from '@ayako/utility'; -import { RESTEvents, type RateLimitData } from '@discordjs/rest'; +import { RESTEvents } from '@discordjs/rest'; import { GuildFeature, PermissionFlagsBits } from 'discord-api-types/v10'; import redis from '../../BaseClient/Bot/Cache.js'; @@ -109,8 +109,11 @@ class RestQueue { if (key.startsWith(`guild:${guildId}:`)) return true; } } - return this.queue.has((item) => - item.type === 'guild' && item.guildId === guildId && (taskName ? item.taskName === taskName : true), + return this.queue.has( + (item) => + item.type === 'guild' && + item.guildId === guildId && + (taskName ? item.taskName === taskName : true), ); } @@ -169,59 +172,39 @@ class RestQueue { } /** - * Update bucket state from RateLimitData event - */ - private updateBucketFromEvent(data: RateLimitData): void { - const bucketKey = this.generateBucketKey(data.method, data.route); - this.routeToBucket.set(bucketKey, data.hash); - - this.buckets.set(data.hash, { - bucketHash: data.hash, - route: data.route, - method: data.method, - remaining: 0, - limit: data.limit, - resetAt: Date.now() + data.timeToReset, - blocked: true, - scope: data.scope as 'user' | 'global' | 'shared', - }); - - // eslint-disable-next-line no-console - console.log( - `[RestQueue] Bucket learned: ${data.route} -> ${data.hash} (resets in ${data.timeToReset}ms)`, - ); - } - - /** - * Update bucket state when we hit a 429 (fallback if event doesn't fire) - */ - private updateBucketFromError(endpoint: string, retryAfter: number): void { - const route = this.normalizeRoute(endpoint); - const bucketKey = this.generateBucketKey('GET', endpoint); - - const existingHash = this.routeToBucket.get(bucketKey); - const bucketHash = existingHash ?? `unknown:${route}`; - - this.routeToBucket.set(bucketKey, bucketHash); - - this.buckets.set(bucketHash, { - bucketHash, - route, - method: 'GET', - remaining: 0, - limit: 1, - resetAt: Date.now() + retryAfter, - blocked: true, - scope: 'user', - }); - } - - /** - * Setup listener for REST rate limit events + * Setup listener for REST rate limit events via Response event */ private setupRateLimitListener(): void { - api.rest.on(RESTEvents.RateLimited, (data: RateLimitData) => { - this.updateBucketFromEvent(data); + api.rest.on(RESTEvents.Response, (request, response) => { + if (response.status !== 429) return; + + const retryAfter = response.headers.get('retry-after'); + const bucket = response.headers.get('x-ratelimit-bucket'); + const scope = response.headers.get('x-ratelimit-scope') as 'user' | 'global' | 'shared' | null; + + if (!retryAfter) return; + + const retryAfterMs = parseFloat(retryAfter) * 1000; + const route = this.normalizeRoute(request.path); + const bucketKey = this.generateBucketKey(request.method, request.path); + const bucketHash = bucket ?? `unknown:${route}`; + + this.routeToBucket.set(bucketKey, bucketHash); + this.buckets.set(bucketHash, { + bucketHash, + route, + method: request.method, + remaining: 0, + limit: 1, + resetAt: Date.now() + retryAfterMs, + blocked: true, + scope: scope ?? 'user', + }); + + // eslint-disable-next-line no-console + console.log( + `[RestQueue] Bucket learned from 429: ${route} -> ${bucketHash} (retry in ${retryAfterMs}ms)`, + ); }); } @@ -260,7 +243,9 @@ class RestQueue { } // eslint-disable-next-line no-console - console.log(`[RestQueue] Enqueued ${guildTasks.length} guild tasks for guilds/${guildId}/* | Queue: ${this.queue.size}`); + console.log( + `[RestQueue] Enqueued ${guildTasks.length} guild tasks for guilds/${guildId}/* | Queue: ${this.queue.size}`, + ); } /** @@ -368,11 +353,9 @@ class RestQueue { } /** - * Handle rate limit (429) response + * Handle rate limit (429) response - re-queue the item */ private onRateLimit(item: RestQueueItem, retryAfter: number): void { - this.updateBucketFromError(item.endpoint, retryAfter); - this.queue.push(item); // eslint-disable-next-line no-console @@ -382,7 +365,7 @@ class RestQueue { } /** - * Execute a task + * Execute a task with timeout */ private async executeTask(item: RestQueueItem): Promise { const taskKey = this.getTaskKey(item.type, item.id, item.taskName); @@ -390,11 +373,15 @@ class RestQueue { this.activeRequests++; try { - if (item.type === 'guild') { - await this.executeGuildTask(item); - } else { - await this.executeChannelTask(item); - } + const taskPromise = + item.type === 'guild' ? this.executeGuildTask(item) : this.executeChannelTask(item); + + await Promise.race([ + taskPromise, + new Promise((_, reject) => + setTimeout(() => reject(new Error('Task timeout')), CONFIG.TASK_TIMEOUT), + ), + ]); this.completedCount++; if (this.completedCount % 10 === 0) { @@ -407,6 +394,10 @@ class RestQueue { if (this.isRateLimitError(error)) { const retryAfter = this.getRateLimitRetryAfter(error); this.onRateLimit(item, retryAfter); + } else if (error instanceof Error && error.message === 'Task timeout') { + // eslint-disable-next-line no-console + console.log(`[RestQueue] Task timeout: ${item.endpoint} - re-queuing`); + this.queue.push(item); } } finally { this.inFlight.delete(taskKey); diff --git a/src/Util/PriorityQueue/types.ts b/src/Util/PriorityQueue/types.ts index 35352b9..ec1564d 100644 --- a/src/Util/PriorityQueue/types.ts +++ b/src/Util/PriorityQueue/types.ts @@ -83,4 +83,6 @@ export const CONFIG = { DEFAULT_RETRY_AFTER: 5000, /** Redis queue size threshold for backpressure - don't start new tasks if above this */ REDIS_QUEUE_THRESHOLD: 10000, + /** Task execution timeout in ms - prevents hanging tasks from blocking the queue */ + TASK_TIMEOUT: 30000, } as const; From 2f9fe5fcba09194f146d0716dd598f2f431e3d55 Mon Sep 17 00:00:00 2001 From: Lars_und_so <46791248+Larsundso@users.noreply.github.com> Date: Wed, 21 Jan 2026 16:34:45 +0100 Subject: [PATCH 21/21] feat: Remove firstChannelInteraction and related channel tasks --- .../Bot/CacheHandlers/AutoModeration.ts | 2 - src/BaseClient/Bot/CacheHandlers/Channel.ts | 16 +-- src/BaseClient/Bot/CacheHandlers/Message.ts | 11 -- src/BaseClient/Bot/CacheHandlers/Stage.ts | 4 - src/BaseClient/Bot/CacheHandlers/Thread.ts | 11 +- src/BaseClient/Bot/CacheHandlers/Voice.ts | 3 - src/BaseClient/Bot/CacheHandlers/index.ts | 5 - src/Util/PriorityQueue/RestQueue.ts | 104 ++---------------- src/Util/PriorityQueue/index.ts | 11 -- src/Util/PriorityQueue/types.ts | 9 +- src/Util/firstChannelInteraction.ts | 32 ------ src/Util/requestChannelPins.ts | 20 ---- 12 files changed, 14 insertions(+), 214 deletions(-) delete mode 100644 src/Util/firstChannelInteraction.ts delete mode 100644 src/Util/requestChannelPins.ts diff --git a/src/BaseClient/Bot/CacheHandlers/AutoModeration.ts b/src/BaseClient/Bot/CacheHandlers/AutoModeration.ts index 6f4b071..34fa716 100644 --- a/src/BaseClient/Bot/CacheHandlers/AutoModeration.ts +++ b/src/BaseClient/Bot/CacheHandlers/AutoModeration.ts @@ -6,7 +6,6 @@ import { type GatewayAutoModerationRuleUpdateDispatchData, } from 'discord-api-types/gateway/v10'; -import firstChannelInteraction from '../../../Util/firstChannelInteraction.js'; import firstGuildInteraction from '../../../Util/firstGuildInteraction.js'; import redis from '../Cache.js'; @@ -15,7 +14,6 @@ export default { data: GatewayAutoModerationActionExecutionDispatchData, ) => { firstGuildInteraction(data.guild_id); - if (data.channel_id) firstChannelInteraction(data.channel_id, data.guild_id); }, [GatewayDispatchEvents.AutoModerationRuleCreate]: async ( diff --git a/src/BaseClient/Bot/CacheHandlers/Channel.ts b/src/BaseClient/Bot/CacheHandlers/Channel.ts index 54437cc..56cf1dc 100644 --- a/src/BaseClient/Bot/CacheHandlers/Channel.ts +++ b/src/BaseClient/Bot/CacheHandlers/Channel.ts @@ -6,8 +6,6 @@ import { type GatewayChannelUpdateDispatchData, } from 'discord-api-types/gateway/v10'; -import firstChannelInteraction from '../../../Util/firstChannelInteraction.js'; -import requestChannelPins from '../../../Util/requestChannelPins.js'; import redis from '../Cache.js'; export default { @@ -33,24 +31,14 @@ export default { await pipeline.exec(); }, - [GatewayDispatchEvents.ChannelPinsUpdate]: async (data: GatewayChannelPinsUpdateDispatchData) => { - if (!data.guild_id) return; - - const success = await firstChannelInteraction(data.channel_id, data.guild_id); - if (success) return; - - await requestChannelPins(data.channel_id, data.guild_id); - }, + [GatewayDispatchEvents.ChannelPinsUpdate]: async (_: GatewayChannelPinsUpdateDispatchData) => {}, [GatewayDispatchEvents.ChannelUpdate]: async (data: GatewayChannelUpdateDispatchData) => { - firstChannelInteraction(data.id, data.guild_id); redis.channels.set(data); }, // eslint-disable-next-line @typescript-eslint/naming-convention VOICE_CHANNEL_STATUS_UPDATE: async (data: { status: string; id: string; guild_id: string }) => { - firstChannelInteraction(data.id, data.guild_id); - if (!data.status?.length) { redis.channelStatus.del(data.guild_id, data.id); return; @@ -65,8 +53,6 @@ export default { guild_id: string; channels: { status: string; id: string }[]; }) => { - await Promise.all(data.channels.map(async (c) => firstChannelInteraction(c.id, data.guild_id))); - await redis.channelStatus.delAll(data.guild_id); data.channels.forEach((c) => { diff --git a/src/BaseClient/Bot/CacheHandlers/Message.ts b/src/BaseClient/Bot/CacheHandlers/Message.ts index 72fbd21..08577ed 100644 --- a/src/BaseClient/Bot/CacheHandlers/Message.ts +++ b/src/BaseClient/Bot/CacheHandlers/Message.ts @@ -13,7 +13,6 @@ import { import { AllThreadGuildChannelTypes } from '../../../Typings/Channel.js'; import evalFn from '../../../Util/eval.js'; -import firstChannelInteraction from '../../../Util/firstChannelInteraction.js'; import firstGuildInteraction from '../../../Util/firstGuildInteraction.js'; import redis from '../Cache.js'; @@ -23,7 +22,6 @@ export default { redis.messages.set(data, data.guild_id || '@me'); firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.channel_id, data.guild_id); } if (!data.webhook_id) redis.users.set(data.author); @@ -39,7 +37,6 @@ export default { [GatewayDispatchEvents.MessageDelete]: async (data: GatewayMessageDeleteDispatchData) => { if (data.guild_id) { firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.channel_id, data.guild_id); } redis.messages.del(data.channel_id, data.id); @@ -49,7 +46,6 @@ export default { [GatewayDispatchEvents.MessageDeleteBulk]: async (data: GatewayMessageDeleteBulkDispatchData) => { if (data.guild_id) { firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.channel_id, data.guild_id); } data.ids.forEach((id) => { @@ -61,7 +57,6 @@ export default { [GatewayDispatchEvents.MessageUpdate]: async (data: GatewayMessageUpdateDispatchData) => { if (data.guild_id) { firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.channel_id, data.guild_id); redis.messages.set(data, data.guild_id); } @@ -70,7 +65,6 @@ export default { [GatewayDispatchEvents.MessagePollVoteAdd]: async (data: GatewayMessagePollVoteDispatchData) => { if (!data.guild_id) return; firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.channel_id, data.guild_id); }, [GatewayDispatchEvents.MessagePollVoteRemove]: async ( @@ -78,7 +72,6 @@ export default { ) => { if (!data.guild_id) return; firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.channel_id, data.guild_id); }, [GatewayDispatchEvents.MessageReactionAdd]: async ( @@ -91,7 +84,6 @@ export default { if (!data.guild_id) return; firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.channel_id, data.guild_id); const cache = await redis.reactions.get( data.channel_id, @@ -125,7 +117,6 @@ export default { if (!data.guild_id) return; firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.channel_id, data.guild_id); const cache = await redis.reactions.get( data.channel_id, @@ -159,7 +150,6 @@ export default { ) => { if (data.guild_id) { firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.channel_id, data.guild_id); } const reactionKeys = await redis.cacheDb.hscanKeys( @@ -180,7 +170,6 @@ export default { ) => { if (!data.guild_id) return; firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.channel_id, data.guild_id); const emojiId = data.emoji.id || data.emoji.name; const reactionKeys = await redis.cacheDb.hscanKeys( diff --git a/src/BaseClient/Bot/CacheHandlers/Stage.ts b/src/BaseClient/Bot/CacheHandlers/Stage.ts index 48360aa..c499bef 100644 --- a/src/BaseClient/Bot/CacheHandlers/Stage.ts +++ b/src/BaseClient/Bot/CacheHandlers/Stage.ts @@ -5,7 +5,6 @@ import { type GatewayStageInstanceUpdateDispatchData, } from 'discord-api-types/gateway/v10'; -import firstChannelInteraction from '../../../Util/firstChannelInteraction.js'; import firstGuildInteraction from '../../../Util/firstGuildInteraction.js'; import redis from '../Cache.js'; @@ -14,7 +13,6 @@ export default { data: GatewayStageInstanceCreateDispatchData, ) => { firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.channel_id, data.guild_id); redis.stages.set(data); }, @@ -23,7 +21,6 @@ export default { data: GatewayStageInstanceDeleteDispatchData, ) => { firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.channel_id, data.guild_id); redis.stages.del(data.id); }, @@ -32,7 +29,6 @@ export default { data: GatewayStageInstanceUpdateDispatchData, ) => { firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.channel_id, data.guild_id); redis.stages.set(data); }, diff --git a/src/BaseClient/Bot/CacheHandlers/Thread.ts b/src/BaseClient/Bot/CacheHandlers/Thread.ts index b646870..17bd2f3 100644 --- a/src/BaseClient/Bot/CacheHandlers/Thread.ts +++ b/src/BaseClient/Bot/CacheHandlers/Thread.ts @@ -8,7 +8,6 @@ import { type GatewayThreadUpdateDispatchData, } from 'discord-api-types/gateway/v10'; -import firstChannelInteraction from '../../../Util/firstChannelInteraction.js'; import firstGuildInteraction from '../../../Util/firstGuildInteraction.js'; import redis from '../Cache.js'; @@ -49,13 +48,7 @@ export default { }, [GatewayDispatchEvents.ThreadUpdate]: async (data: GatewayThreadUpdateDispatchData) => { - if (!data.guild_id) { - redis.threads.set(data); - return; - } - - firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.id, data.guild_id); + if (data.guild_id) firstGuildInteraction(data.guild_id); redis.threads.set(data); }, @@ -79,7 +72,6 @@ export default { data: GatewayThreadMembersUpdateDispatchData, ) => { firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.id, data.guild_id); data.added_members?.forEach((threadMember) => { redis.threadMembers.set(threadMember, data.guild_id); @@ -95,7 +87,6 @@ export default { data: GatewayThreadMemberUpdateDispatchData, ) => { firstGuildInteraction(data.guild_id); - if (data.id) firstChannelInteraction(data.id, data.guild_id); redis.threadMembers.set(data, data.guild_id); diff --git a/src/BaseClient/Bot/CacheHandlers/Voice.ts b/src/BaseClient/Bot/CacheHandlers/Voice.ts index a9a7db5..70544b2 100644 --- a/src/BaseClient/Bot/CacheHandlers/Voice.ts +++ b/src/BaseClient/Bot/CacheHandlers/Voice.ts @@ -5,7 +5,6 @@ import { type GatewayVoiceServerUpdateDispatchData, } from 'discord-api-types/v10'; -import firstChannelInteraction from '../../../Util/firstChannelInteraction.js'; import firstGuildInteraction from '../../../Util/firstGuildInteraction.js'; import redis from '../Cache.js'; @@ -14,7 +13,6 @@ export default { data: GatewayVoiceChannelEffectSendDispatchData, ) => { firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.channel_id, data.guild_id); }, [GatewayDispatchEvents.VoiceServerUpdate]: async (data: GatewayVoiceServerUpdateDispatchData) => { @@ -25,7 +23,6 @@ export default { if (!data.guild_id) return; firstGuildInteraction(data.guild_id); - if (data.channel_id) firstChannelInteraction(data.channel_id, data.guild_id); redis.voices.set(data); }, diff --git a/src/BaseClient/Bot/CacheHandlers/index.ts b/src/BaseClient/Bot/CacheHandlers/index.ts index 78b4f1e..4e405b8 100644 --- a/src/BaseClient/Bot/CacheHandlers/index.ts +++ b/src/BaseClient/Bot/CacheHandlers/index.ts @@ -21,7 +21,6 @@ import { AllThreadGuildChannelTypes, } from '../../../Typings/Channel.js'; import emit from '../../../Util/EventBus.js'; -import firstChannelInteraction from '../../../Util/firstChannelInteraction.js'; import firstGuildInteraction from '../../../Util/firstGuildInteraction.js'; import { priorityQueue } from '../../../Util/PriorityQueue/index.js'; import redis from '../Cache.js'; @@ -94,9 +93,6 @@ const caches: Record< [GatewayDispatchEvents.InteractionCreate]: async (data: GatewayInteractionCreateDispatchData) => { if (data.guild_id) firstGuildInteraction(data.guild_id); - if (data.channel?.id && data.guild_id) { - firstChannelInteraction(data.channel.id, data.guild_id); - } if (data.user) redis.users.set(data.user); if (data.message && data.guild_id) { @@ -133,7 +129,6 @@ const caches: Record< [GatewayDispatchEvents.TypingStart]: async (data: GatewayTypingStartDispatchData) => { if (!data.member || !data.guild_id) return; firstGuildInteraction(data.guild_id); - firstChannelInteraction(data.channel_id, data.guild_id); redis.members.set(data.member, data.guild_id); }, diff --git a/src/Util/PriorityQueue/RestQueue.ts b/src/Util/PriorityQueue/RestQueue.ts index 6c1b5d9..bdd443c 100644 --- a/src/Util/PriorityQueue/RestQueue.ts +++ b/src/Util/PriorityQueue/RestQueue.ts @@ -4,9 +4,9 @@ * - Max 5 concurrent requests (configurable) * - Per-endpoint rate limit tracking * - Re-queues on 429 with retry_after delay - * - Priority by member count (larger guilds first), then guild > channel + * - Priority by member count (larger guilds first) */ -import { getChannelPerms, getGuildPerms } from '@ayako/utility'; +import { getGuildPerms } from '@ayako/utility'; import { RESTEvents } from '@discordjs/rest'; import { GuildFeature, PermissionFlagsBits } from 'discord-api-types/v10'; @@ -16,23 +16,15 @@ import requestEventSubscribers from '../requestEventSubscribers.js'; import requestVoiceChannelStatuses from '../requestVoiceChannelStatuses.js'; import { BinaryHeap } from './BinaryHeap.js'; -import { - CONFIG, - type BucketState, - type ChannelTaskName, - type GuildTaskName, - type RestQueueItem, -} from './types.js'; +import { CONFIG, type BucketState, type GuildTaskName, type RestQueueItem } from './types.js'; /** * Priority comparator for REST queue items * Higher member count = higher priority - * Equal member count: guild > channel - * Equal type: FIFO by addedAt + * Equal member count: FIFO by addedAt */ const restComparator = (a: RestQueueItem, b: RestQueueItem): number => { if (a.memberCount !== b.memberCount) return b.memberCount - a.memberCount; - if (a.type !== b.type) return a.type === 'guild' ? -1 : 1; return a.addedAt - b.addedAt; }; @@ -94,8 +86,8 @@ class RestQueue { /** * Generate a unique key for a task (used for in-flight tracking) */ - private getTaskKey(type: 'guild' | 'channel', id: string, taskName: string): string { - return `${type}:${id}:${taskName}`; + private getTaskKey(id: string, taskName: string): string { + return `guild:${id}:${taskName}`; } /** @@ -103,27 +95,14 @@ class RestQueue { */ private hasGuildTask(guildId: string, taskName?: string): boolean { if (taskName) { - if (this.inFlight.has(this.getTaskKey('guild', guildId, taskName))) return true; + if (this.inFlight.has(this.getTaskKey(guildId, taskName))) return true; } else { for (const key of this.inFlight) { if (key.startsWith(`guild:${guildId}:`)) return true; } } return this.queue.has( - (item) => - item.type === 'guild' && - item.guildId === guildId && - (taskName ? item.taskName === taskName : true), - ); - } - - /** - * Check if a channel task is already queued or in-flight - */ - private hasChannelTask(channelId: string, taskName: string): boolean { - if (this.inFlight.has(this.getTaskKey('channel', channelId, taskName))) return true; - return this.queue.has( - (item) => item.type === 'channel' && item.id === channelId && item.taskName === taskName, + (item) => item.guildId === guildId && (taskName ? item.taskName === taskName : true), ); } @@ -269,32 +248,6 @@ class RestQueue { console.log(`[RestQueue] Enqueued ${endpoint} | Queue: ${this.queue.size}`); } - /** - * Enqueue a channel task (e.g., pins) for first channel interaction - */ - enqueueChannelTask( - channelId: string, - guildId: string, - memberCount: number, - taskName: ChannelTaskName, - ): void { - if (this.hasChannelTask(channelId, taskName)) return; - - const endpoint = `channels/${channelId}/${taskName}`; - this.queue.push({ - type: 'channel', - id: channelId, - guildId, - memberCount, - taskName, - endpoint, - addedAt: Date.now(), - }); - - // eslint-disable-next-line no-console - console.log(`[RestQueue] Enqueued ${endpoint} | Queue: ${this.queue.size}`); - } - /** * Process items from the queue */ @@ -368,16 +321,13 @@ class RestQueue { * Execute a task with timeout */ private async executeTask(item: RestQueueItem): Promise { - const taskKey = this.getTaskKey(item.type, item.id, item.taskName); + const taskKey = this.getTaskKey(item.id, item.taskName); this.inFlight.add(taskKey); this.activeRequests++; try { - const taskPromise = - item.type === 'guild' ? this.executeGuildTask(item) : this.executeChannelTask(item); - await Promise.race([ - taskPromise, + this.executeGuildTask(item), new Promise((_, reject) => setTimeout(() => reject(new Error('Task timeout')), CONFIG.TASK_TIMEOUT), ), @@ -457,15 +407,6 @@ class RestQueue { } } - /** - * Execute a channel task - */ - private async executeChannelTask(item: RestQueueItem): Promise { - if (item.taskName === 'pins') { - await this.taskPins(item.id, item.guildId); - } - } - //#region Guild Tasks private async taskAutoModRules(guildId: string): Promise { @@ -628,31 +569,6 @@ class RestQueue { invites.forEach((i) => redis.invites.set(i)); } - //#region Channel Tasks - - private async taskPins(channelId: string, guildId: string): Promise { - const channelPerms = await getChannelPerms.call( - redis, - guildId, - clientCache.user?.id || '0', - channelId, - ); - const readPerms = PermissionFlagsBits.ViewAuditLog | PermissionFlagsBits.ReadMessageHistory; - if ((channelPerms.allow & readPerms) !== readPerms) return; - - await redis.pins.delAll(channelId); - - const pins = await api.channels.getPins(channelId); - - for (let i = 0; i < pins.length; i++) { - const pin = pins[i]; - redis.pins.set(channelId, pin.id); - redis.messages.set(pin, guildId); - (pins as unknown[])[i] = undefined; - } - pins.length = 0; - } - //#region Utilities private isRateLimitError(error: unknown): boolean { diff --git a/src/Util/PriorityQueue/index.ts b/src/Util/PriorityQueue/index.ts index 78ecc82..58405e8 100644 --- a/src/Util/PriorityQueue/index.ts +++ b/src/Util/PriorityQueue/index.ts @@ -14,7 +14,6 @@ * // Enqueue requests * priorityQueue.enqueueGuildTasks(guildId, memberCount); * priorityQueue.enqueueMembers(guildId, memberCount); - * priorityQueue.enqueueChannelPins(channelId, guildId, memberCount); * * // Notify when member chunks complete * priorityQueue.onMemberChunkComplete(guildId); @@ -80,16 +79,6 @@ export const priorityQueue = { await gatewayQueue.enqueue(guildId, memberCount); }, - /** - * Enqueue channel pins request - * @param channelId Channel ID - * @param guildId Guild ID - * @param memberCount Guild member count for priority - */ - enqueueChannelPins(channelId: string, guildId: string, memberCount: number): void { - restQueue.enqueueChannelTask(channelId, guildId, memberCount, 'pins'); - }, - /** * Notify that member chunks have completed for a guild * @param guildId Guild ID diff --git a/src/Util/PriorityQueue/types.ts b/src/Util/PriorityQueue/types.ts index ec1564d..10caedc 100644 --- a/src/Util/PriorityQueue/types.ts +++ b/src/Util/PriorityQueue/types.ts @@ -28,20 +28,15 @@ export type GuildTaskName = | 'invites' | 'vcStatus'; -/** - * Task names for REST API calls on first channel interaction - */ -export type ChannelTaskName = 'pins'; - /** * Item in the REST queue */ export type RestQueueItem = { - type: 'guild' | 'channel'; + type: 'guild'; id: string; guildId: string; memberCount: number; - taskName: GuildTaskName | ChannelTaskName; + taskName: GuildTaskName; endpoint: string; addedAt: number; }; diff --git a/src/Util/firstChannelInteraction.ts b/src/Util/firstChannelInteraction.ts deleted file mode 100644 index e7282f8..0000000 --- a/src/Util/firstChannelInteraction.ts +++ /dev/null @@ -1,32 +0,0 @@ -import { getRandom } from '@ayako/utility'; - -import cache from '../BaseClient/Bot/Cache.js'; -import { cache as clientCache } from '../BaseClient/Bot/Client.js'; - -import { priorityQueue } from './PriorityQueue/index.js'; - -export default async (channelId: string, guildId: string) => { - if (!channelId) return false; - - const [isMember] = await cache.execPipeline<[string | null]>((pipeline) => { - pipeline.hget('channel-interacts', channelId); - pipeline.hset('channel-interacts', channelId, '1'); - pipeline.call( - 'hexpire', - 'channel-interacts', - getRandom(604800 / 2, 604800), - 'NX', - 'FIELDS', - 1, - channelId, - ); - }); - if (isMember === '1') return false; - - if (!guildId) return false; - - const memberCount = clientCache.members.get(guildId) || 0; - priorityQueue.enqueueChannelPins(channelId, guildId, memberCount); - - return true; -}; diff --git a/src/Util/requestChannelPins.ts b/src/Util/requestChannelPins.ts deleted file mode 100644 index 7937847..0000000 --- a/src/Util/requestChannelPins.ts +++ /dev/null @@ -1,20 +0,0 @@ -import cache from '../BaseClient/Bot/Cache.js'; -import { cache as clientCache } from '../BaseClient/Bot/Client.js'; - -import { priorityQueue } from './PriorityQueue/index.js'; - -const requestChannelPins = async (channelId: string, guildId: string): Promise => { - if (!channelId || !guildId) return; - - const [alreadyRequested] = await cache.execPipeline<[string | null]>((pipeline) => { - pipeline.hget('channel-pins-requested', channelId); - pipeline.hset('channel-pins-requested', channelId, '1'); - pipeline.call('hexpire', 'channel-pins-requested', 300, 'NX', 'FIELDS', 1, channelId); - }); - if (alreadyRequested === '1') return; - - const memberCount = clientCache.members.get(guildId) || 0; - priorityQueue.enqueueChannelPins(channelId, guildId, memberCount); -}; - -export default requestChannelPins;