From 89312db67df4ebb04c32a8865711cb16eadafc4c Mon Sep 17 00:00:00 2001 From: Saniddhya Dubey Date: Sat, 11 Apr 2026 16:29:49 -0400 Subject: [PATCH] feat: implement EWMA rate limiter with strategy support (#404) --- .changeset/README.md | 8 +++ .changeset/config.json | 2 +- .changeset/light-lilies-yawn.md | 5 ++ .changeset/slimy-bars-burn.md | 5 ++ CONFIGURATION.md | 40 +++-------- resources/default-settings.yaml | 3 + src/@types/adapters.ts | 4 ++ src/@types/settings.ts | 5 ++ src/adapters/redis-adapter.ts | 33 ++++++++- src/adapters/web-socket-adapter.ts | 4 +- .../get-admission-check-controller-factory.ts | 4 +- .../post-invoice-controller-factory.ts | 4 +- src/factories/message-handler-factory.ts | 4 +- src/factories/rate-limiter-factory.ts | 16 ++++- src/factories/websocket-adapter-factory.ts | 4 +- src/handlers/event-message-handler.ts | 4 +- .../rate-limiter-middleware.ts | 4 +- src/utils/ewma-rate-limiter.ts | 64 +++++++++++++++++ .../rate-limiter/rate-limiter.feature | 7 ++ .../features/rate-limiter/rate-limiter.ts | 65 +++++++++++++++++ test/unit/adapters/redis-adapter.spec.ts | 71 ++++++++++++++++++ .../handlers/event-message-handler.spec.ts | 18 ++--- test/unit/utils/ewma-rate-limiter.spec.ts | 72 +++++++++++++++++++ .../utils/sliding-window-rate-limiter.spec.ts | 2 +- 24 files changed, 388 insertions(+), 60 deletions(-) create mode 100644 .changeset/README.md create mode 100644 .changeset/light-lilies-yawn.md create mode 100644 .changeset/slimy-bars-burn.md create mode 100644 src/utils/ewma-rate-limiter.ts create mode 100644 test/integration/features/rate-limiter/rate-limiter.feature create mode 100644 test/integration/features/rate-limiter/rate-limiter.ts create mode 100644 test/unit/utils/ewma-rate-limiter.spec.ts diff --git a/.changeset/README.md b/.changeset/README.md new file mode 100644 index 00000000..654c6d47 --- /dev/null +++ b/.changeset/README.md @@ -0,0 +1,8 @@ +# Changesets + +Hello and welcome! This folder has been automatically generated by `@changesets/cli`, a build tool that works +with multi-package repos, or single-package repos to help you version and publish your code. You can +find the full documentation for it [in our repository](https://github.com/changesets/changesets). + +We have a quick list of common questions to get you started engaging with this project in +[our documentation](https://github.com/changesets/changesets/blob/main/docs/common-questions.md). diff --git a/.changeset/config.json b/.changeset/config.json index d88011f6..5c58ec9f 100644 --- a/.changeset/config.json +++ b/.changeset/config.json @@ -1,5 +1,5 @@ { - "$schema": "https://unpkg.com/@changesets/config@3.1.1/schema.json", + "$schema": "https://unpkg.com/@changesets/config@3.1.4/schema.json", "changelog": "@changesets/cli/changelog", "commit": false, "fixed": [], diff --git a/.changeset/light-lilies-yawn.md b/.changeset/light-lilies-yawn.md new file mode 100644 index 00000000..faabbcc8 --- /dev/null +++ b/.changeset/light-lilies-yawn.md @@ -0,0 +1,5 @@ +--- +"nostream": minor +--- + +Add EWMA rate limiter with configurable strategy support diff --git a/.changeset/slimy-bars-burn.md b/.changeset/slimy-bars-burn.md new file mode 100644 index 00000000..4976c6dd --- /dev/null +++ b/.changeset/slimy-bars-burn.md @@ -0,0 +1,5 @@ +--- +"nostream": minor +--- + +Add EWMA rate limiter with strategy support diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 2826569d..34367f2c 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -109,40 +109,18 @@ The settings below are listed in alphabetical order by name. Please keep this ta | | Defaults to zero. Disabled when set to zero. | | limits.event.pubkey.whitelist | List of public keys to always allow. Only public keys in this list will be able to post to this relay. Use for private relays. | | limits.event.rateLimits[].kinds | List of event kinds rate limited. Use `[min, max]` for ranges. Optional. | -| limits.event.rateLimits[].period | Rate limiting period in milliseconds. | +| limits.event.rateLimits[].period | Rate limiting period in milliseconds. For `sliding_window`: the time window during which requests are counted. For `ewma`: the half-life of the exponential decay — shorter values forget bursts faster, longer values are stricter on bursty clients. | | limits.event.rateLimits[].rate | Maximum number of events during period. | | limits.event.retention.kind.whitelist | Event kinds excluded from retention purge. NIP-62 `REQUEST_TO_VANISH` is always excluded from retention purge, even if not listed here. | | limits.event.retention.maxDays | Maximum number of days to retain events. Purge deletes events that are expired (`expires_at`), soft-deleted (`deleted_at`), or older than this window (`created_at`). Any non-positive value disables retention purge. | | limits.event.retention.pubkey.whitelist | Public keys excluded from retention purge. | | limits.event.whitelists.ipAddresses | List of IPs (IPv4 or IPv6) to ignore rate limits. | -| limits.event.whitelists.pubkeys | List of public keys to ignore rate limits. | -| limits.message.ipWhitelist | List of IPs (IPv4 or IPv6) to ignore rate limits. | -| limits.message.rateLimits[].period | Rate limit period in milliseconds. | +| limits.client.subscription.maxSubscriptions | Maximum number of subscriptions per connected client. Defaults to 10. Disabled when set to zero. | +| limits.client.subscription.maxFilters | Maximum number of filters per subscription. Defaults to 10. Disabled when set to zero. | +| limits.message.rateLimits[].period | Rate limiting period in milliseconds. For `sliding_window`: the time window. For `ewma`: the half-life of the decay function. | | limits.message.rateLimits[].rate | Maximum number of messages during period. | -| mirroring.static[].address | Address of mirrored relay. (e.g. ws://100.100.100.100:8008) | -| mirroring.static[].filters | Subscription filters used to mirror. | -| mirroring.static[].limits.event | Event limit overrides for this mirror. See configurations under limits.event. | -| mirroring.static[].secret | Secret to pass to relays. Nostream relays only. Optional. | -| mirroring.static[].skipAdmissionCheck | Disable the admission fee check for events coming from this mirror. | -| network.maxPayloadSize | Maximum number of bytes accepted per WebSocket frame | -| network.remoteIpHeader | HTTP header from proxy containing IP address from client. | -| nip05.domainBlacklist | List of domains blocked from NIP-05 verification. Authors with NIP-05 at these domains will be rejected. | -| nip05.domainWhitelist | List of domains allowed for NIP-05 verification. If set, only authors verified at these domains can publish. | -| nip05.maxConsecutiveFailures | Number of consecutive verification failures before giving up on an author. Defaults to 20. | -| nip05.mode | NIP-05 verification mode: `enabled` requires verification, `passive` verifies without blocking, `disabled` does nothing. Defaults to `disabled`. | -| nip05.verifyExpiration | Time in milliseconds before a successful NIP-05 verification expires and needs re-checking. Defaults to 604800000 (1 week). | -| nip05.verifyUpdateFrequency | Minimum interval in milliseconds between re-verification attempts for a given author. Defaults to 86400000 (24 hours). | -| paymentProcessors.lnbits.baseURL | Base URL of your Lnbits instance. | -| paymentProcessors.lnbits.callbackBaseURL | Public-facing Nostream's Lnbits Callback URL. (e.g. https://relay.your-domain.com/callbacks/lnbits) | -| paymentProcessors.lnurl.invoiceURL | [LUD-06 Pay Request](https://github.com/lnurl/luds/blob/luds/06.md) provider URL. (e.g. https://getalby.com/lnurlp/your-username) | -| paymentProcessors.zebedee.baseURL | Zebedee's API base URL. | -| paymentProcessors.zebedee.callbackBaseURL | Public-facing Nostream's Zebedee Callback URL (e.g. https://relay.your-domain.com/callbacks/zebedee) | -| paymentProcessors.zebedee.ipWhitelist | List with Zebedee's API Production IPs. See [ZBD API Documentation](https://api-reference.zebedee.io/#c7e18276-6935-4cca-89ae-ad949efe9a6a) for more info. | -| payments.enabled | Enabled payments. Defaults to false. | -| payments.feeSchedules.admission[].amount | Admission fee amount in msats. | -| payments.feeSchedules.admission[].enabled | Enables admission fee. Defaults to false. | -| payments.feeSchedules.admission[].whitelists.event_kinds | List of event kinds to waive admission fee. Use `[min, max]` for ranges. | -| payments.feeSchedules.admission[].whitelists.pubkeys | List of pubkeys to waive admission fee. | -| payments.processor | Either `zebedee`, `lnbits`, `lnurl`. | -| workers.count | Number of workers to spin up to handle incoming connections. | -| | Spin workers as many CPUs are available when set to zero. Defaults to zero. | +| limits.message.ipWhitelist | List of IPs (IPv4 or IPv6) to ignore rate limits. | +| limits.admissionCheck.rateLimits[].period | Rate limiting period in milliseconds. For `sliding_window`: the time window. For `ewma`: the half-life of the decay function. | +| limits.admissionCheck.rateLimits[].rate | Maximum number of admission checks during period. | +| limits.admissionCheck.ipWhitelist | List of IPs (IPv4 or IPv6) to ignore rate limits. | +| limits.rateLimiter.strategy | Rate limiting strategy. Either `ewma` or `sliding_window`. Defaults to `ewma`. When using `ewma`, the `period` field in each rate limit serves as the half-life for the exponential decay function. Note: when switching from `sliding_window` to `ewma`, consider increasing `rate` values slightly as EWMA penalizes bursty behavior more aggressively. | diff --git a/resources/default-settings.yaml b/resources/default-settings.yaml index 2a659de5..7f0ea0ef 100755 --- a/resources/default-settings.yaml +++ b/resources/default-settings.yaml @@ -64,6 +64,9 @@ workers: mirroring: static: [] limits: + # strategy selection configuration for rate limiting: + rateLimiter: + strategy: ewma invoice: rateLimits: - period: 60000 diff --git a/src/@types/adapters.ts b/src/@types/adapters.ts index c346adb0..130d7853 100644 --- a/src/@types/adapters.ts +++ b/src/@types/adapters.ts @@ -25,4 +25,8 @@ export interface ICacheAdapter { removeRangeByScoreFromSortedSet(key: string, min: number, max: number): Promise getRangeFromSortedSet(key: string, start: number, stop: number): Promise setKeyExpiry(key: string, expiry: number): Promise + deleteKey(key: string): Promise + getHKey(key: string, field: string): Promise + setHKey(key: string, fields: Record): Promise + eval(script: string, keys: string[], args: string[]): Promise } diff --git a/src/@types/settings.ts b/src/@types/settings.ts index 6f2691b0..8c36fe8a 100644 --- a/src/@types/settings.ts +++ b/src/@types/settings.ts @@ -22,6 +22,10 @@ export interface RateLimit { rate: number } +export interface RateLimiterSettings { + strategy: 'ewma' | 'sliding_window' +} + export interface EventIdLimits { minLeadingZeroBits?: number } @@ -133,6 +137,7 @@ export interface AdmissionCheckLimits { } export interface Limits { + rateLimiter?: RateLimiterSettings invoice?: InvoiceLimits admissionCheck?: AdmissionCheckLimits connection?: ConnectionLimits diff --git a/src/adapters/redis-adapter.ts b/src/adapters/redis-adapter.ts index a3816588..d7b187c2 100644 --- a/src/adapters/redis-adapter.ts +++ b/src/adapters/redis-adapter.ts @@ -5,10 +5,13 @@ import { ICacheAdapter } from '../@types/adapters' const debug = createLogger('redis-adapter') export class RedisAdapter implements ICacheAdapter { + private connection: Promise + private scriptShas: Map = new Map() + public constructor(private readonly client: CacheClient) { - this.connection = client.connect() + this.connection = client.isOpen ? Promise.resolve() : client.connect() this.connection.catch((error) => this.onClientError(error)) @@ -92,4 +95,32 @@ export class RedisAdapter implements ICacheAdapter { return this.client.zAdd(key, members) } + + public async deleteKey(key: string): Promise { + await this.connection + debug('delete %s key', key) + return this.client.del(key) + } + + public async getHKey(key: string, field: string): Promise { + await this.connection + debug('get %s field for key %s', field, key) + return await this.client.hGet(key, field) ?? '' + } + + public async setHKey(key: string, fields: Record): Promise { + await this.connection + debug('set %s key', key) + return await this.client.hSet(key, fields) >= 0 + } + + public async eval(script: string, keys: string[], args: string[]): Promise { + await this.connection + if (!this.scriptShas.has(script)) { + const sha = await this.client.scriptLoad(script) + this.scriptShas.set(script, sha) + } + return await this.client.evalSha(this.scriptShas.get(script)!, { keys, arguments: args }) + } + } diff --git a/src/adapters/web-socket-adapter.ts b/src/adapters/web-socket-adapter.ts index e436c013..f88cfc9e 100644 --- a/src/adapters/web-socket-adapter.ts +++ b/src/adapters/web-socket-adapter.ts @@ -38,7 +38,7 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter private readonly request: IncomingHttpMessage, private readonly webSocketServer: IWebSocketServerAdapter, private readonly createMessageHandler: Factory, - private readonly slidingWindowRateLimiter: Factory, + private readonly rateLimiter: Factory, private readonly settings: Factory, ) { super() @@ -211,7 +211,7 @@ export class WebSocketAdapter extends EventEmitter implements IWebSocketAdapter return false } - const rateLimiter = this.slidingWindowRateLimiter() + const rateLimiter = this.rateLimiter() const hit = (period: number, rate: number) => rateLimiter.hit(`${client}:message:${period}`, 1, { period, rate }) diff --git a/src/factories/controllers/get-admission-check-controller-factory.ts b/src/factories/controllers/get-admission-check-controller-factory.ts index cdad249c..08703428 100644 --- a/src/factories/controllers/get-admission-check-controller-factory.ts +++ b/src/factories/controllers/get-admission-check-controller-factory.ts @@ -2,7 +2,7 @@ import { getMasterDbClient, getReadReplicaDbClient } from '../../database/client import { createSettings } from '../settings-factory' import { EventRepository } from '../../repositories/event-repository' import { GetSubmissionCheckController } from '../../controllers/admission/get-admission-check-controller' -import { slidingWindowRateLimiterFactory } from '../rate-limiter-factory' +import { rateLimiterFactory } from '../rate-limiter-factory' import { UserRepository } from '../../repositories/user-repository' export const createGetAdmissionCheckController = () => { @@ -11,5 +11,5 @@ export const createGetAdmissionCheckController = () => { const eventRepository = new EventRepository(dbClient, readReplicaDbClient) const userRepository = new UserRepository(dbClient, eventRepository) - return new GetSubmissionCheckController(userRepository, createSettings, slidingWindowRateLimiterFactory) + return new GetSubmissionCheckController(userRepository, createSettings, rateLimiterFactory) } diff --git a/src/factories/controllers/post-invoice-controller-factory.ts b/src/factories/controllers/post-invoice-controller-factory.ts index f92abd1e..966d3027 100644 --- a/src/factories/controllers/post-invoice-controller-factory.ts +++ b/src/factories/controllers/post-invoice-controller-factory.ts @@ -4,7 +4,7 @@ import { createSettings } from '../settings-factory' import { EventRepository } from '../../repositories/event-repository' import { IController } from '../../@types/controllers' import { PostInvoiceController } from '../../controllers/invoices/post-invoice-controller' -import { slidingWindowRateLimiterFactory } from '../rate-limiter-factory' +import { rateLimiterFactory } from '../rate-limiter-factory' import { UserRepository } from '../../repositories/user-repository' export const createPostInvoiceController = (): IController => { @@ -14,5 +14,5 @@ export const createPostInvoiceController = (): IController => { const userRepository = new UserRepository(dbClient, eventRepository) const paymentsService = createPaymentsService() - return new PostInvoiceController(userRepository, paymentsService, createSettings, slidingWindowRateLimiterFactory) + return new PostInvoiceController(userRepository, paymentsService, createSettings, rateLimiterFactory) } diff --git a/src/factories/message-handler-factory.ts b/src/factories/message-handler-factory.ts index e702bf8b..9caf4001 100644 --- a/src/factories/message-handler-factory.ts +++ b/src/factories/message-handler-factory.ts @@ -6,7 +6,7 @@ import { EventMessageHandler } from '../handlers/event-message-handler' import { eventStrategyFactory } from './event-strategy-factory' import { getCacheClient } from '../cache/client' import { RedisAdapter } from '../adapters/redis-adapter' -import { slidingWindowRateLimiterFactory } from './rate-limiter-factory' +import { rateLimiterFactory } from './rate-limiter-factory' import { SubscribeMessageHandler } from '../handlers/subscribe-message-handler' import { UnsubscribeMessageHandler } from '../handlers/unsubscribe-message-handler' @@ -33,9 +33,9 @@ export const messageHandlerFactory = eventRepository, userRepository, createSettings, - slidingWindowRateLimiterFactory, nip05VerificationRepository, getCache(), + rateLimiterFactory, ) } case MessageType.REQ: diff --git a/src/factories/rate-limiter-factory.ts b/src/factories/rate-limiter-factory.ts index 5e33f8c7..fbd59638 100644 --- a/src/factories/rate-limiter-factory.ts +++ b/src/factories/rate-limiter-factory.ts @@ -1,3 +1,5 @@ +import { createSettings } from './settings-factory' +import { EWMARateLimiter } from '../utils/ewma-rate-limiter' import { getCacheClient } from '../cache/client' import { ICacheAdapter } from '../@types/adapters' import { IRateLimiter } from '../@types/utils' @@ -6,11 +8,19 @@ import { SlidingWindowRateLimiter } from '../utils/sliding-window-rate-limiter' let instance: IRateLimiter = undefined -export const slidingWindowRateLimiterFactory = () => { +export const rateLimiterFactory = () => { if (!instance) { const cache: ICacheAdapter = new RedisAdapter(getCacheClient()) - instance = new SlidingWindowRateLimiter(cache) + const settings = createSettings() + const strategy = settings.limits?.rateLimiter?.strategy ?? 'ewma' + + if (strategy === 'sliding_window') { + instance = new SlidingWindowRateLimiter(cache) + } else { + instance = new EWMARateLimiter(cache) + } } + return instance -} +} \ No newline at end of file diff --git a/src/factories/websocket-adapter-factory.ts b/src/factories/websocket-adapter-factory.ts index 03dc305a..d40b6aec 100644 --- a/src/factories/websocket-adapter-factory.ts +++ b/src/factories/websocket-adapter-factory.ts @@ -5,7 +5,7 @@ import { IEventRepository, INip05VerificationRepository, IUserRepository } from import { createSettings } from './settings-factory' import { IWebSocketServerAdapter } from '../@types/adapters' import { messageHandlerFactory } from './message-handler-factory' -import { slidingWindowRateLimiterFactory } from './rate-limiter-factory' +import { rateLimiterFactory } from './rate-limiter-factory' import { WebSocketAdapter } from '../adapters/web-socket-adapter' export const webSocketAdapterFactory = @@ -20,6 +20,6 @@ export const webSocketAdapterFactory = request, webSocketServerAdapter, messageHandlerFactory(eventRepository, userRepository, nip05VerificationRepository), - slidingWindowRateLimiterFactory, + rateLimiterFactory, createSettings, ) diff --git a/src/handlers/event-message-handler.ts b/src/handlers/event-message-handler.ts index 2c5fd5e0..eb8fa546 100644 --- a/src/handlers/event-message-handler.ts +++ b/src/handlers/event-message-handler.ts @@ -46,9 +46,9 @@ export class EventMessageHandler implements IMessageHandler { protected readonly eventRepository: IEventRepository, protected readonly userRepository: IUserRepository, private readonly settings: () => Settings, - private readonly slidingWindowRateLimiter: Factory, private readonly nip05VerificationRepository: INip05VerificationRepository, private readonly cache: ICacheAdapter, + private readonly rateLimiter: Factory, ) {} public async handleMessage(message: IncomingEventMessage): Promise { @@ -287,7 +287,7 @@ export class EventMessageHandler implements IMessageHandler { return false } - const rateLimiter = this.slidingWindowRateLimiter() + const rateLimiter = this.rateLimiter() const toString = (input: any | any[]): string => { return Array.isArray(input) ? `[${input.map(toString)}]` : input.toString() diff --git a/src/handlers/request-handlers/rate-limiter-middleware.ts b/src/handlers/request-handlers/rate-limiter-middleware.ts index 72c7da87..9aa08d74 100644 --- a/src/handlers/request-handlers/rate-limiter-middleware.ts +++ b/src/handlers/request-handlers/rate-limiter-middleware.ts @@ -2,8 +2,8 @@ import { NextFunction, Request, Response } from 'express' import { createLogger } from '../../factories/logger-factory' import { createSettings } from '../../factories/settings-factory' import { getRemoteAddress } from '../../utils/http' +import { rateLimiterFactory } from '../../factories/rate-limiter-factory' import { Settings } from '../../@types/settings' -import { slidingWindowRateLimiterFactory } from '../../factories/rate-limiter-factory' const debug = createLogger('rate-limiter-middleware') @@ -34,7 +34,7 @@ export async function isRateLimited(remoteAddress: string, settings: Settings): return false } - const rateLimiter = slidingWindowRateLimiterFactory() + const rateLimiter = rateLimiterFactory() const hit = (period: number, rate: number) => rateLimiter.hit(`${remoteAddress}:connection:${period}`, 1, { period: period, rate: rate }) diff --git a/src/utils/ewma-rate-limiter.ts b/src/utils/ewma-rate-limiter.ts new file mode 100644 index 00000000..0f212c6b --- /dev/null +++ b/src/utils/ewma-rate-limiter.ts @@ -0,0 +1,64 @@ +import { IRateLimiter, IRateLimiterOptions } from '../@types/utils' +import { createLogger } from '../factories/logger-factory' +import { ICacheAdapter } from '../@types/adapters' + +const debug = createLogger('ewma-rate-limiter') + +const rateLimitScript = { + NUMBER_OF_KEYS: 1, + SCRIPT: ` + local key = KEYS[1] + local timestamp = tonumber(ARGV[1]) + local rate = tonumber(ARGV[2]) + local period = tonumber(ARGV[3]) + local R_old = tonumber(redis.call('HGET', key, 'rate')) or 0 + local T_old = tonumber(redis.call('HGET', key, 'timestamp')) or timestamp + + local deltaT = timestamp - T_old + local lambda = math.log(2) / period + local R_new = R_old * math.exp(-lambda * deltaT) + tonumber(ARGV[4]) + + redis.call('HSET', key, 'rate', R_new, 'timestamp', timestamp) + redis.call('EXPIRE', key, math.ceil(period / 1000)) + + if R_new > rate then + return 1 + else + return 0 + end + `, + } + +export const calculateEWMA = ( + rOld: number, + deltaT: number, + period: number, + step: number +): number => { + const lambda = Math.log(2) / period + return rOld * Math.exp(-lambda * deltaT) + step +} + +export class EWMARateLimiter implements IRateLimiter { + public constructor( + private readonly cache: ICacheAdapter, + ) {} + + public async hit( + key: string, + step: number, + options: IRateLimiterOptions, + ): Promise { + const { rate, period } = options + + const result = await this.cache.eval(rateLimitScript.SCRIPT, + [key], + [Date.now().toString(), rate.toString(), period.toString(), step.toString()] + ) + + debug('ewma rate limited on %s bucket: %s', key, result ? 'yes' : 'no') + + return result === 1 + } + +} diff --git a/test/integration/features/rate-limiter/rate-limiter.feature b/test/integration/features/rate-limiter/rate-limiter.feature new file mode 100644 index 00000000..52508e4b --- /dev/null +++ b/test/integration/features/rate-limiter/rate-limiter.feature @@ -0,0 +1,7 @@ +Feature: Rate Limiter + @rate-limiter + Scenario: Alice is rate limited when message rate exceeds the limit + Given someone called Alice + And Alice's message rate is already at the limit + When Alice sends a text_note event expecting to be rate limited + Then Alice receives a notice with rate limited \ No newline at end of file diff --git a/test/integration/features/rate-limiter/rate-limiter.ts b/test/integration/features/rate-limiter/rate-limiter.ts new file mode 100644 index 00000000..ab6b7104 --- /dev/null +++ b/test/integration/features/rate-limiter/rate-limiter.ts @@ -0,0 +1,65 @@ +import { After, Before, Given, When } from '@cucumber/cucumber' +import { assocPath, pipe } from 'ramda' +import { createClient } from 'redis' +import { WebSocket } from 'ws' + +import { createEvent } from '../helpers' +import { Event } from '../../../../src/@types/event' +import { getCacheConfig } from '../../../../src/cache/client' +import { SettingsStatic } from '../../../../src/utils/settings' + +let testCacheClient: any +const rateLimitKeys: string[] = [] + +Before({ tags: '@rate-limiter' }, async function() { + testCacheClient = createClient(getCacheConfig()) + await testCacheClient.connect() + SettingsStatic._settings = pipe( + assocPath(['limits', 'rateLimiter', 'strategy'], 'ewma'), + assocPath(['limits', 'message', 'rateLimits'], [{ period: 60000, rate: 10 }]), + assocPath(['limits', 'message', 'ipWhitelist'], []), + )(SettingsStatic._settings) as any +}) + +Given(/(\w+)'s message rate is already at the limit/, async function(name: string) { + const ws = this.parameters.clients[name] as WebSocket + const address = (ws as any)._socket?.remoteAddress ?? '::1' + const period = 60000 + const key = `${address}:message:${period}` + + await testCacheClient.hSet(key, { + rate: '999', + timestamp: Date.now().toString(), + }) + + rateLimitKeys.push(key) +}) + +When(/(\w+) sends a text_note event expecting to be rate limited/, async function(name: string) { + const ws = this.parameters.clients[name] as WebSocket + const { pubkey, privkey } = this.parameters.identities[name] + + const event: Event = await createEvent({ pubkey, kind: 1, content: 'hello' }, privkey) + + await new Promise((resolve, reject) => { + ws.send(JSON.stringify(['EVENT', event]), (err) => { + if (err) reject(err) + else resolve() + }) + }) + + this.parameters.events[name].push(event) +}) + +After({ tags: '@rate-limiter' }, async function() { + SettingsStatic._settings = pipe( + assocPath(['limits', 'message', 'rateLimits'], []), + assocPath(['limits', 'message', 'ipWhitelist'], ['::1', '10.10.10.1', '::ffff:10.10.10.1']), + )(SettingsStatic._settings) as any + + for (const key of rateLimitKeys) { + await testCacheClient.del(key) + } + rateLimitKeys.length = 0 + await testCacheClient.disconnect() +}) diff --git a/test/unit/adapters/redis-adapter.spec.ts b/test/unit/adapters/redis-adapter.spec.ts index c630e50c..f9db2f85 100644 --- a/test/unit/adapters/redis-adapter.spec.ts +++ b/test/unit/adapters/redis-adapter.spec.ts @@ -34,6 +34,12 @@ describe('RedisAdapter', () => { zAdd: sandbox.stub(), removeListener: sandbox.stub(), once: sandbox.stub(), + del: sandbox.stub(), + hGet: sandbox.stub(), + hSet: sandbox.stub(), + scriptLoad: sandbox.stub(), + evalSha: sandbox.stub(), + isOpen: false, } adapter = new RedisAdapter(client) @@ -193,4 +199,69 @@ describe('RedisAdapter', () => { expect(result).to.equal(1) }) }) + + describe('deleteKey', () => { + it('calls client.del with the key and returns the result', async () => { + client.del.resolves(1) + + const result = await adapter.deleteKey('test-key') + + expect(client.del).to.have.been.calledOnceWithExactly('test-key') + expect(result).to.equal(1) + }) + }) + + describe('getHKey', () => { + it('calls client.hGet with key and field and returns the value', async () => { + client.hGet.resolves('test-value') + + const result = await adapter.getHKey('test-key', 'test-field') + + expect(client.hGet).to.have.been.calledOnceWithExactly('test-key', 'test-field') + expect(result).to.equal('test-value') + }) + + it('returns empty string when field does not exist', async () => { + client.hGet.resolves(undefined) + + const result = await adapter.getHKey('test-key', 'missing-field') + + expect(result).to.equal('') + }) + }) + + describe('setHKey', () => { + it('calls client.hSet with key and fields and returns true', async () => { + client.hSet.resolves(1) + + const result = await adapter.setHKey('test-key', { field1: 'value1', field2: 'value2' }) + + expect(client.hSet).to.have.been.calledOnceWithExactly('test-key', { field1: 'value1', field2: 'value2' }) + expect(result).to.be.true + }) + }) + + describe('eval', () => { + it('loads script on first call and uses evalSha', async () => { + client.scriptLoad.resolves('abc123sha') + client.evalSha.resolves(1) + + const result = await adapter.eval('local x = 1', ['key1'], ['arg1']) + + expect(client.scriptLoad).to.have.been.calledOnce + expect(client.evalSha).to.have.been.calledOnceWithExactly('abc123sha', { keys: ['key1'], arguments: ['arg1'] }) + expect(result).to.equal(1) + }) + + it('reuses cached SHA on subsequent calls', async () => { + client.scriptLoad.resolves('abc123sha') + client.evalSha.resolves(0) + + await adapter.eval('local x = 1', ['key1'], ['arg1']) + await adapter.eval('local x = 1', ['key1'], ['arg1']) + + expect(client.scriptLoad).to.have.been.calledOnce + expect(client.evalSha).to.have.been.calledTwice + }) + }) }) diff --git a/test/unit/handlers/event-message-handler.spec.ts b/test/unit/handlers/event-message-handler.spec.ts index 227d19bc..fb4610c3 100644 --- a/test/unit/handlers/event-message-handler.spec.ts +++ b/test/unit/handlers/event-message-handler.spec.ts @@ -99,9 +99,9 @@ describe('EventMessageHandler', () => { ({ info: { relay_url: 'relay_url' }, }) as any, - () => ({ hit: async () => false }), {} as any, { hasKey: async () => false, setKey: async () => true } as any, + () => ({ hit: async () => false }), ) }) @@ -269,9 +269,9 @@ describe('EventMessageHandler', () => { {} as any, userRepository, () => settings, - () => ({ hit: async () => false }), {} as any, { hasKey: async () => false, setKey: async () => true } as any, + () => ({ hit: async () => false }), ) }) @@ -717,9 +717,9 @@ describe('EventMessageHandler', () => { {} as any, userRepository, () => settings, - () => ({ hit: rateLimiterHitStub }), {} as any, { hasKey: async () => false, setKey: async () => true } as any, + () => ({ hit: rateLimiterHitStub }), ) }) @@ -992,9 +992,9 @@ describe('EventMessageHandler', () => { {} as any, userRepository, () => settings, - () => ({ hit: async () => false }), {} as any, cacheStub, + () => ({ hit: async () => false }), ) }) @@ -1197,9 +1197,9 @@ describe('EventMessageHandler', () => { { hasActiveRequestToVanish: async () => false } as any, userRepository, () => settings, - () => ({ hit: async () => false }), nip05VerificationRepository, { hasKey: async () => false, setKey: async () => true, getKey: async () => null } as any, + () => ({ hit: async () => false }), ) }) @@ -1373,9 +1373,9 @@ describe('EventMessageHandler', () => { { hasActiveRequestToVanish: async () => false } as any, userRepository, () => settings, - () => ({ hit: async () => false }), nip05VerificationRepository, { hasKey: async () => false, setKey: async () => true, getKey: async () => null } as any, + () => ({ hit: async () => false }), ) }) @@ -1604,9 +1604,9 @@ describe('EventMessageHandler', () => { { hasActiveRequestToVanish: async () => false } as any, userRepository, () => settings, - () => ({ hit: async () => false }), nip05VerificationRepository, { hasKey: async () => false, setKey: async () => true, getKey: async () => null } as any, + () => ({ hit: async () => false }), ) }) @@ -1780,9 +1780,9 @@ describe('EventMessageHandler', () => { { hasActiveRequestToVanish: async () => false } as any, userRepository, () => settings, - () => ({ hit: async () => false }), nip05VerificationRepository, { hasKey: async () => false, setKey: async () => true, getKey: async () => null } as any, + () => ({ hit: async () => false }), ) }) @@ -1969,4 +1969,4 @@ describe('EventMessageHandler', () => { expect(nip05VerificationRepository.upsert).to.have.been.calledOnce }) }) -}) +}) \ No newline at end of file diff --git a/test/unit/utils/ewma-rate-limiter.spec.ts b/test/unit/utils/ewma-rate-limiter.spec.ts new file mode 100644 index 00000000..53ec8f62 --- /dev/null +++ b/test/unit/utils/ewma-rate-limiter.spec.ts @@ -0,0 +1,72 @@ +import { expect } from 'chai' +import Sinon from 'sinon' + +import { calculateEWMA, EWMARateLimiter } from '../../../src/utils/ewma-rate-limiter' +import { ICacheAdapter } from '../../../src/@types/adapters' +import { IRateLimiter } from '../../../src/@types/utils' + +describe('EWMARateLimiter', () => { + let clock: Sinon.SinonFakeTimers + let cache: ICacheAdapter + let rateLimiter: IRateLimiter + + let evalStub: Sinon.SinonStub + let sandbox: Sinon.SinonSandbox + + beforeEach(() => { + sandbox = Sinon.createSandbox() + clock = sandbox.useFakeTimers(1665546189000) + evalStub = sandbox.stub() + cache = { + eval: evalStub, + } as unknown as ICacheAdapter + rateLimiter = new EWMARateLimiter(cache) + }) + + afterEach(() => { + clock.restore() + sandbox.restore() + }) + describe('calculateEWMA', () => { + it('returns 1 on first request with no history', () => { + const result = calculateEWMA(0, 0, 120000, 1) + expect(result).to.equal(1) + }) + + it('increases rate on burst requests with no time gap', () => { + const first = calculateEWMA(0, 0, 120000, 1) + const second = calculateEWMA(first, 0, 120000, 1) + const third = calculateEWMA(second, 0, 120000, 1) + + expect(third).to.be.greaterThan(first) + }) + + it('decays rate after time gap', () => { + const rateAfterBurst = calculateEWMA(10, 0, 120000, 1) + const rateAfterGap = calculateEWMA(rateAfterBurst, 120000, 120000, 1) + + expect(rateAfterGap).to.be.lessThan(rateAfterBurst) + }) + + it('rate approaches 1 after very long inactivity', () => { + const rateAfterBurst = calculateEWMA(10, 0, 120000, 1) + const rateAfterLongGap = calculateEWMA(rateAfterBurst, 9999999, 120000, 1) + + expect(rateAfterLongGap).to.be.closeTo(1, 0.001) + }) + }) + + describe('hit', () => { + it('returns false on first request', async () => { + evalStub.resolves(0) + const result = await rateLimiter.hit('key', 1, { period: 120000, rate: 10 }) + expect(result).to.be.false + }) + + it('returns true when rate limit exceeded', async () => { + evalStub.resolves(1) + const result = await rateLimiter.hit('key', 1, { period: 120000, rate: 10 }) + expect(result).to.be.true + }) + }) +}) diff --git a/test/unit/utils/sliding-window-rate-limiter.spec.ts b/test/unit/utils/sliding-window-rate-limiter.spec.ts index 7e48495f..87cb75a4 100644 --- a/test/unit/utils/sliding-window-rate-limiter.spec.ts +++ b/test/unit/utils/sliding-window-rate-limiter.spec.ts @@ -38,7 +38,7 @@ describe('SlidingWindowRateLimiter', () => { getKey: getKeyStub, hasKey: hasKeyStub, setKey: setKeyStub, - } + } as unknown as ICacheAdapter rateLimiter = new SlidingWindowRateLimiter(cache) })