From bbe1c352e55ef88bba5aa0fc34db15e7687ec8ae Mon Sep 17 00:00:00 2001 From: Fawzi Essam Date: Sun, 9 Nov 2025 02:03:07 +0100 Subject: [PATCH 1/2] feat: supporting policies for TTL instead of queue-level TTL, with default false Signed-off-by: Fawzi Essam --- src/core/RunMQ.ts | 4 +- src/core/constants/index.ts | 1 + src/core/consumer/ConsumerCreatorUtils.ts | 4 + src/core/consumer/RunMQConsumerCreator.ts | 48 ++- src/core/exceptions/Exceptions.ts | 1 + .../Policies/RabbitMQMessageTTLPolicy.ts | 17 + .../Policies/RunMQTTLPolicyManager.ts | 62 ++++ .../management/RabbitMQManagementClient.ts | 117 +++++++ src/core/message/RabbitMQMessage.ts | 2 +- src/core/utils/{Utils.ts => RunMQUtils.ts} | 4 + src/index.ts | 3 +- src/types/index.ts | 35 ++ .../RabbitMQManagementConfigExample.ts | 33 ++ .../RabbitMQOperatorPolicyExamples.ts | 11 + .../Examples/RunMQConnectionConfigExample.ts | 10 + tests/Examples/RunMQMessageExample.ts | 2 +- .../RunMQProcessorConfigurationExample.ts | 11 +- .../e2e/RabbitMQManagementClient.e2e.test.ts | 312 ++++++++++++++++++ tests/e2e/RunMQ.processing.e2e.test.ts | 64 +++- tests/e2e/RunMQ.publisher.e2e.test.ts | 2 +- tests/e2e/RunMQTTLPolicyManager.e2e.test.ts | 193 +++++++++++ tests/mocks/MockedRabbitMQMessage.ts | 2 +- tests/unit/core/RunMQ.test.ts | 9 +- .../consumer/RunMQConsumerCreator.test.ts | 53 ++- .../RabbitMQManagementClient.test.ts | 147 +++++++++ .../management/RunMQTTLPolicyManager.test.ts | 138 ++++++++ 26 files changed, 1261 insertions(+), 24 deletions(-) create mode 100644 src/core/management/Policies/RabbitMQMessageTTLPolicy.ts create mode 100644 src/core/management/Policies/RunMQTTLPolicyManager.ts create mode 100644 src/core/management/RabbitMQManagementClient.ts rename src/core/utils/{Utils.ts => RunMQUtils.ts} (84%) create mode 100644 tests/Examples/RabbitMQManagementConfigExample.ts create mode 100644 tests/Examples/RabbitMQOperatorPolicyExamples.ts create mode 100644 tests/e2e/RabbitMQManagementClient.e2e.test.ts create mode 100644 tests/e2e/RunMQTTLPolicyManager.e2e.test.ts create mode 100644 tests/unit/core/management/RabbitMQManagementClient.test.ts create mode 100644 tests/unit/core/management/RunMQTTLPolicyManager.test.ts diff --git a/src/core/RunMQ.ts b/src/core/RunMQ.ts index 741812c..f752366 100644 --- a/src/core/RunMQ.ts +++ b/src/core/RunMQ.ts @@ -2,7 +2,7 @@ import {RunMQProcessorConfiguration, RunMQConnectionConfig, RunMQPublisher, RunM import {RunMQException} from "@src/core/exceptions/RunMQException"; import {AmqplibClient} from "@src/core/clients/AmqplibClient"; import {Exceptions} from "@src/core/exceptions/Exceptions"; -import {RunMQUtils} from "@src/core/utils/Utils"; +import {RunMQUtils} from "@src/core/utils/RunMQUtils"; import {Constants, DEFAULTS} from "@src/core/constants"; import {Channel} from "amqplib"; import {RunMQConsumerCreator} from "@src/core/consumer/RunMQConsumerCreator"; @@ -51,7 +51,7 @@ export class RunMQ { * @param processor The function that will process the incoming messages */ public async process>(topic: string, config: RunMQProcessorConfiguration, processor: (message: RunMQMessageContent) => Promise) { - const consumer = new RunMQConsumerCreator(this.defaultChannel!, this.amqplibClient, this.logger); + const consumer = new RunMQConsumerCreator(this.defaultChannel!, this.amqplibClient, this.logger, this.config.management); await consumer.createConsumer(new ConsumerConfiguration(topic, config, processor)) } diff --git a/src/core/constants/index.ts b/src/core/constants/index.ts index f01535f..eda2e29 100644 --- a/src/core/constants/index.ts +++ b/src/core/constants/index.ts @@ -4,6 +4,7 @@ export const Constants = { DEAD_LETTER_ROUTER_EXCHANGE_NAME: RUNMQ_PREFIX + "dead_letter_router", RETRY_DELAY_QUEUE_PREFIX: RUNMQ_PREFIX + "retry_delay_", DLQ_QUEUE_PREFIX: RUNMQ_PREFIX + "dlq_", + MESSAGE_TTL_OPERATOR_POLICY_PREFIX: RUNMQ_PREFIX + "message_ttl_operator_policy", } export const DEFAULTS = { diff --git a/src/core/consumer/ConsumerCreatorUtils.ts b/src/core/consumer/ConsumerCreatorUtils.ts index 6f2b73a..25e2612 100644 --- a/src/core/consumer/ConsumerCreatorUtils.ts +++ b/src/core/consumer/ConsumerCreatorUtils.ts @@ -7,4 +7,8 @@ export class ConsumerCreatorUtils { static getRetryDelayTopicName(topic: string): string { return Constants.RETRY_DELAY_QUEUE_PREFIX + topic; } + + static getMessageTTLPolicyName(topic: string): string { + return Constants.MESSAGE_TTL_OPERATOR_POLICY_PREFIX + topic; + } } \ No newline at end of file diff --git a/src/core/consumer/RunMQConsumerCreator.ts b/src/core/consumer/RunMQConsumerCreator.ts index f5c3b99..973adf0 100644 --- a/src/core/consumer/RunMQConsumerCreator.ts +++ b/src/core/consumer/RunMQConsumerCreator.ts @@ -14,18 +14,25 @@ import {RunMQLogger} from "@src/core/logging/RunMQLogger"; import {DefaultDeserializer} from "@src/core/serializers/deserializer/DefaultDeserializer"; import {ConsumerCreatorUtils} from "@src/core/consumer/ConsumerCreatorUtils"; import {RunMQPublisherCreator} from "@src/core/publisher/RunMQPublisherCreator"; -import {AMQPClient} from "@src/types"; +import {AMQPClient, RabbitMQManagementConfig} from "@src/types"; +import {RunMQTTLPolicyManager} from "@src/core/management/Policies/RunMQTTLPolicyManager"; +import {RunMQException} from "@src/core/exceptions/RunMQException"; +import {Exceptions} from "@src/core/exceptions/Exceptions"; export class RunMQConsumerCreator { + private ttlPolicyManager: RunMQTTLPolicyManager; + constructor( private defaultChannel: Channel, private client: AMQPClient, private logger: RunMQLogger, + managementConfig?: RabbitMQManagementConfig ) { + this.ttlPolicyManager = new RunMQTTLPolicyManager(logger, managementConfig); } - public async createConsumer(consumerConfiguration: ConsumerConfiguration) { + await this.ttlPolicyManager.initialize(); await this.assertQueues(consumerConfiguration); await this.bindQueues(consumerConfiguration); for (let i = 0; i < consumerConfiguration.processorConfig.consumersCount; i++) { @@ -78,16 +85,43 @@ export class RunMQConsumerCreator { deadLetterExchange: Constants.DEAD_LETTER_ROUTER_EXCHANGE_NAME, deadLetterRoutingKey: consumerConfiguration.processorConfig.name }); - await this.defaultChannel.assertQueue(ConsumerCreatorUtils.getRetryDelayTopicName(consumerConfiguration.processorConfig.name), { - durable: true, - deadLetterExchange: Constants.ROUTER_EXCHANGE_NAME, - messageTtl: consumerConfiguration.processorConfig.attemptsDelay ?? DEFAULTS.PROCESSING_RETRY_DELAY, - }); await this.defaultChannel.assertQueue(ConsumerCreatorUtils.getDLQTopicName(consumerConfiguration.processorConfig.name), { durable: true, deadLetterExchange: Constants.ROUTER_EXCHANGE_NAME, deadLetterRoutingKey: consumerConfiguration.processorConfig.name }); + + const retryDelayQueueName = ConsumerCreatorUtils.getRetryDelayTopicName(consumerConfiguration.processorConfig.name); + const messageDelay = consumerConfiguration.processorConfig.attemptsDelay ?? DEFAULTS.PROCESSING_RETRY_DELAY + + + const policiesForTTL = consumerConfiguration.processorConfig.usePoliciesForTTL ?? false; + if (!policiesForTTL) { + await this.defaultChannel.assertQueue(retryDelayQueueName, { + durable: true, + deadLetterExchange: Constants.ROUTER_EXCHANGE_NAME, + messageTtl: messageDelay, + }); + return; + } + + const result = await this.ttlPolicyManager.apply( + retryDelayQueueName, + messageDelay + ); + if (result) { + await this.defaultChannel.assertQueue(retryDelayQueueName, { + durable: true, + deadLetterExchange: Constants.ROUTER_EXCHANGE_NAME + }); + return; + } + throw new RunMQException( + Exceptions.FAILURE_TO_DEFINE_TTL_POLICY, + { + error: "Failed to apply TTL policy to queue: " + retryDelayQueueName + } + ); } diff --git a/src/core/exceptions/Exceptions.ts b/src/core/exceptions/Exceptions.ts index 875d705..23d08f1 100644 --- a/src/core/exceptions/Exceptions.ts +++ b/src/core/exceptions/Exceptions.ts @@ -4,4 +4,5 @@ export class Exceptions { public static NOT_INITIALIZED = 'NOT_INITIALIZED'; public static INVALID_MESSAGE_FORMAT = 'MESSAGE_SHOULD_BE_VALID_RECORD'; public static UNSUPPORTED_SCHEMA = 'UNSUPPORTED_SCHEMA'; + public static FAILURE_TO_DEFINE_TTL_POLICY = 'FAILURE_TO_DEFINE_TTL_POLICY'; } \ No newline at end of file diff --git a/src/core/management/Policies/RabbitMQMessageTTLPolicy.ts b/src/core/management/Policies/RabbitMQMessageTTLPolicy.ts new file mode 100644 index 0000000..3d047b5 --- /dev/null +++ b/src/core/management/Policies/RabbitMQMessageTTLPolicy.ts @@ -0,0 +1,17 @@ +import {RabbitMQOperatorPolicy} from "@src/types"; +import {RunMQUtils} from "@src/core/utils/RunMQUtils"; +import {ConsumerCreatorUtils} from "@src/core/consumer/ConsumerCreatorUtils"; + +export class RabbitMQMessageTTLPolicy { + static createFor(queueName: string, ttl: number): RabbitMQOperatorPolicy { + return { + name: ConsumerCreatorUtils.getMessageTTLPolicyName(queueName), + pattern: RunMQUtils.escapeRegExp(queueName), + definition: { + "message-ttl": ttl + }, + "apply-to": "queues", + priority: 1000 + } + } +} \ No newline at end of file diff --git a/src/core/management/Policies/RunMQTTLPolicyManager.ts b/src/core/management/Policies/RunMQTTLPolicyManager.ts new file mode 100644 index 0000000..687d092 --- /dev/null +++ b/src/core/management/Policies/RunMQTTLPolicyManager.ts @@ -0,0 +1,62 @@ +import {RabbitMQManagementClient} from "@src/core/management/RabbitMQManagementClient"; +import {RunMQLogger} from "@src/core/logging/RunMQLogger"; +import {DEFAULTS} from "@src/core/constants"; +import {RabbitMQManagementConfig} from "@src"; +import {RabbitMQMessageTTLPolicy} from "@src/core/management/Policies/RabbitMQMessageTTLPolicy"; + +export class RunMQTTLPolicyManager { + private readonly managementClient: RabbitMQManagementClient | null = null; + private isManagementPluginEnabled = false; + + constructor( + private logger: RunMQLogger, + private managementConfig?: RabbitMQManagementConfig + ) { + if (this.managementConfig) { + this.managementClient = new RabbitMQManagementClient(this.managementConfig, this.logger); + } + } + + public async initialize(): Promise { + if (!this.managementClient) { + this.logger.warn("Management client not configured"); + return; + } + + this.isManagementPluginEnabled = await this.managementClient.checkManagementPluginEnabled(); + + if (!this.isManagementPluginEnabled) { + this.logger.warn("RabbitMQ management plugin is not enabled"); + } else { + this.logger.info("RabbitMQ management plugin is enabled"); + } + } + + public async apply( + queueName: string, + ttl?: number, + vhost: string = "%2F" + ): Promise { + const actualTTL = ttl ?? DEFAULTS.PROCESSING_RETRY_DELAY; + + if (this.isManagementPluginEnabled && this.managementClient) { + const success = await this.managementClient.createOrUpdateOperatorPolicy( + vhost, + RabbitMQMessageTTLPolicy.createFor(queueName, actualTTL) + ); + + if (success) { + return true + } + } + return false; + } + + + public async cleanup(queueName: string, vhost: string = "%2F"): Promise { + if (this.isManagementPluginEnabled && this.managementClient) { + const policyName = `ttl-policy-${queueName}`; + await this.managementClient.deleteOperatorPolicy(vhost, policyName); + } + } +} \ No newline at end of file diff --git a/src/core/management/RabbitMQManagementClient.ts b/src/core/management/RabbitMQManagementClient.ts new file mode 100644 index 0000000..597a77b --- /dev/null +++ b/src/core/management/RabbitMQManagementClient.ts @@ -0,0 +1,117 @@ +import {RunMQLogger} from "@src/core/logging/RunMQLogger"; +import {RabbitMQManagementConfig} from "@src"; +import {RabbitMQOperatorPolicy} from "@src/types"; + +export class RabbitMQManagementClient { + constructor( + private config: RabbitMQManagementConfig, + private logger: RunMQLogger + ) {} + + private getAuthHeader(): string { + const credentials = Buffer.from(`${this.config.username}:${this.config.password}`).toString('base64'); + return `Basic ${credentials}`; + } + + public async createOrUpdateOperatorPolicy(vhost: string, policy: RabbitMQOperatorPolicy): Promise { + try { + const url = `${this.config.url}/api/operator-policies/${vhost}/${encodeURIComponent(policy.name)}`; + + const response = await fetch(url, { + method: 'PUT', + headers: { + 'Content-Type': 'application/json', + 'Authorization': this.getAuthHeader() + }, + body: JSON.stringify({ + pattern: policy.pattern, + definition: policy.definition, + priority: policy.priority || 0, + "apply-to": policy["apply-to"] + }) + }); + + if (!response.ok) { + const error = await response.text(); + this.logger.error(`Failed to create operator policy: ${response.status} - ${error}`); + return false; + } + + this.logger.info(`Successfully set operator policy: ${policy.name}`); + return true; + } catch (error) { + this.logger.error(`Error creating operator policy: ${error}`); + return false; + } + } + + public async getOperatorPolicy(vhost: string, policyName: string): Promise { + try { + const url = `${this.config.url}/api/operator-policies/${vhost}/${encodeURIComponent(policyName)}`; + + const response = await fetch(url, { + method: 'GET', + headers: { + 'Authorization': this.getAuthHeader() + } + }); + + if (!response.ok) { + if (response.status === 404) { + return null; + } + const error = await response.text(); + this.logger.error(`Failed to get operator policy: ${response.status} - ${error}`); + return null; + } + + return await response.json(); + } catch (error) { + this.logger.error(`Error getting operator policy: ${error}`); + return null; + } + } + + public async deleteOperatorPolicy(vhost: string, policyName: string): Promise { + try { + const url = `${this.config.url}/api/operator-policies/${vhost}/${encodeURIComponent(policyName)}`; + + const response = await fetch(url, { + method: 'DELETE', + headers: { + 'Authorization': this.getAuthHeader() + } + }); + + if (!response.ok && response.status !== 404) { + const error = await response.text(); + this.logger.error(`Failed to delete operator policy: ${response.status} - ${error}`); + return false; + } + + this.logger.info(`Successfully deleted operator policy: ${policyName}`); + return true; + } catch (error) { + this.logger.error(`Error deleting operator policy: ${error}`); + return false; + } + } + + public async checkManagementPluginEnabled(): Promise { + try { + const url = `${this.config.url}/api/overview`; + + const response = await fetch(url, { + method: 'GET', + headers: { + 'Authorization': this.getAuthHeader() + } + }); + + return response.ok; + } catch (error) { + this.logger.warn(`Management plugin not accessible: ${error}`); + return false; + } + } +} \ No newline at end of file diff --git a/src/core/message/RabbitMQMessage.ts b/src/core/message/RabbitMQMessage.ts index f300009..37335bd 100644 --- a/src/core/message/RabbitMQMessage.ts +++ b/src/core/message/RabbitMQMessage.ts @@ -1,5 +1,5 @@ import {Channel} from "amqplib"; -import {RunMQUtils} from "@src/core/utils/Utils"; +import {RunMQUtils} from "@src/core/utils/RunMQUtils"; import {RabbitMQMessageProperties} from "@src/core/message/RabbitMQMessageProperties"; import {AMQPMessage} from "@src/core/message/AmqpMessage"; diff --git a/src/core/utils/Utils.ts b/src/core/utils/RunMQUtils.ts similarity index 84% rename from src/core/utils/Utils.ts rename to src/core/utils/RunMQUtils.ts index e03cfe6..448912e 100644 --- a/src/core/utils/Utils.ts +++ b/src/core/utils/RunMQUtils.ts @@ -16,4 +16,8 @@ export class RunMQUtils { throw new RunMQException(Exceptions.INVALID_MESSAGE_FORMAT, {}); } } + + public static escapeRegExp(string: string): string { + return string.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); + } } \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index 5dabfcc..fbf94b6 100644 --- a/src/index.ts +++ b/src/index.ts @@ -6,5 +6,6 @@ export { RunMQProcessorConfiguration, MessageSchema, RunMQMessageContent, - RunMQMessageMetaContent + RunMQMessageMetaContent, + RabbitMQManagementConfig, } from "./types"; \ No newline at end of file diff --git a/src/types/index.ts b/src/types/index.ts index 53d9e5e..bd8c4fa 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -26,6 +26,15 @@ export interface RunMQConnectionConfig { * Default is 10 attempts. */ maxReconnectAttempts?: number; + /** + * Optional configuration for RabbitMQ management HTTP API. + * If provided, policies will be used for TTL instead of queue-level TTL. + */ + management?: { + url: string; + username: string; + password: string; + }; } export type SchemaFailureStrategy = 'dlq' @@ -54,6 +63,32 @@ export interface RunMQProcessorConfiguration { * @see MessageSchema */ messageSchema?: MessageSchema + + /** + * Whether to use RabbitMQ policies for setting TTL instead of queue-level TTL. + * Requires management configuration to be provided in RunMQConnectionConfig. + * Default is false. + * + * Recommended to use it for flexibility. + */ + usePoliciesForTTL?: boolean; +} + +export interface RabbitMQManagementConfig { + url: string; + username: string; + password: string; +} + +export interface RabbitMQOperatorPolicy { + name: string; + pattern: string; + definition: { + "message-ttl"?: number; + [key: string]: any; + }; + priority?: number; + "apply-to": "queues" | "exchanges" | "all"; } export interface RunMQMessageContent { diff --git a/tests/Examples/RabbitMQManagementConfigExample.ts b/tests/Examples/RabbitMQManagementConfigExample.ts new file mode 100644 index 0000000..ca479f8 --- /dev/null +++ b/tests/Examples/RabbitMQManagementConfigExample.ts @@ -0,0 +1,33 @@ +export class RabbitMQManagementConfigExample { + static valid() { + return { + url: 'http://localhost:15673', + username: 'test', + password: 'test', + }; + } + + static invalid() { + return { + url: 'http://invalid-host:15673', + username: 'invalid', + password: 'invalid', + }; + } + + static invalidCredentials() { + return { + url: 'http://localhost:15673', + username: 'wrong', + password: 'wrong', + }; + } + + static nonRoutableHost() { + return { + url: 'http://1.2.3.4:15672', + username: 'test', + password: 'test', + } + } +} \ No newline at end of file diff --git a/tests/Examples/RabbitMQOperatorPolicyExamples.ts b/tests/Examples/RabbitMQOperatorPolicyExamples.ts new file mode 100644 index 0000000..8ba5b96 --- /dev/null +++ b/tests/Examples/RabbitMQOperatorPolicyExamples.ts @@ -0,0 +1,11 @@ +import {RabbitMQMessageTTLPolicy} from "@src/core/management/Policies/RabbitMQMessageTTLPolicy"; +import {faker} from "@faker-js/faker"; + +export class RabbitMQMessageTTLPolicyExample { + static validPolicy( + queueName: string = faker.lorem.word(), + ttl: number = 5000 + ) { + return RabbitMQMessageTTLPolicy.createFor(queueName, ttl); + } +} \ No newline at end of file diff --git a/tests/Examples/RunMQConnectionConfigExample.ts b/tests/Examples/RunMQConnectionConfigExample.ts index 58ea272..5358926 100644 --- a/tests/Examples/RunMQConnectionConfigExample.ts +++ b/tests/Examples/RunMQConnectionConfigExample.ts @@ -1,5 +1,6 @@ import {RunMQConnectionConfig} from "@src/types"; import {faker} from "@faker-js/faker"; +import {RabbitMQManagementConfigExample} from "@tests/Examples/RabbitMQManagementConfigExample"; export class RunMQConnectionConfigExample { static random( @@ -23,6 +24,15 @@ export class RunMQConnectionConfigExample { ); } + static validWithManagement(): RunMQConnectionConfig { + return { + url: 'amqp://test:test@localhost:5673', + reconnectDelay: 100, + maxReconnectAttempts: 3, + management: RabbitMQManagementConfigExample.valid() + } + } + static invalid(): RunMQConnectionConfig { return this.random( 'amqp://invalid:invalid@localhost:9999', diff --git a/tests/Examples/RunMQMessageExample.ts b/tests/Examples/RunMQMessageExample.ts index 043bc28..97d93ff 100644 --- a/tests/Examples/RunMQMessageExample.ts +++ b/tests/Examples/RunMQMessageExample.ts @@ -1,6 +1,6 @@ import {MessageExample} from "@tests/Examples/MessageExample"; import {RunMQMessage, RunMQMessageMeta} from "@src/core/message/RunMQMessage"; -import {RunMQUtils} from "@src/core/utils/Utils"; +import {RunMQUtils} from "@src/core/utils/RunMQUtils"; export class RunMQMessageExample { static random( diff --git a/tests/Examples/RunMQProcessorConfigurationExample.ts b/tests/Examples/RunMQProcessorConfigurationExample.ts index a56c6c3..95c5121 100644 --- a/tests/Examples/RunMQProcessorConfigurationExample.ts +++ b/tests/Examples/RunMQProcessorConfigurationExample.ts @@ -7,7 +7,8 @@ export class RunMQProcessorConfigurationExample { consumersCount = faker.number.int({min: 1, max: 10}), attempts = faker.number.int({min: 0, max: 10}), attemptsDelay = faker.number.int({min: 10, max: 1000}), - messageSchema = MessageSchemaExample.random() + messageSchema = MessageSchemaExample.random(), + usePoliciesForTTL: boolean = false ): RunMQProcessorConfiguration { return { name, @@ -15,22 +16,26 @@ export class RunMQProcessorConfigurationExample { attempts, attemptsDelay, messageSchema, + usePoliciesForTTL } } static simpleNoSchema( name: string = faker.lorem.word(), - consumersCount: number = 1 + consumersCount: number = 1, + usePoliciesForTTL: boolean = false ): RunMQProcessorConfiguration { return this.random( name, consumersCount, 3, 100, - undefined + undefined, + usePoliciesForTTL ); } + static simpleWithPersonSchema(attempts: number = 3): RunMQProcessorConfiguration { return this.random( 'person_processor', diff --git a/tests/e2e/RabbitMQManagementClient.e2e.test.ts b/tests/e2e/RabbitMQManagementClient.e2e.test.ts new file mode 100644 index 0000000..54e5575 --- /dev/null +++ b/tests/e2e/RabbitMQManagementClient.e2e.test.ts @@ -0,0 +1,312 @@ +import {RabbitMQManagementClient} from "@src/core/management/RabbitMQManagementClient"; +import {MockedRunMQLogger} from "@tests/mocks/MockedRunMQLogger"; +import {RabbitMQManagementConfigExample} from "@tests/Examples/RabbitMQManagementConfigExample"; +import {RabbitMQMessageTTLPolicyExample} from "@tests/Examples/RabbitMQOperatorPolicyExamples"; +import {RunMQUtils} from "@src/core/utils/RunMQUtils"; +import {RabbitMQOperatorPolicy} from "@src/types"; + +describe('RabbitMQManagementClient E2E Tests', () => { + const validConfig = RabbitMQManagementConfigExample.valid(); + let managementClient: RabbitMQManagementClient; + + beforeEach(() => { + jest.clearAllMocks(); + managementClient = new RabbitMQManagementClient(validConfig, MockedRunMQLogger); + }); + + afterEach(async () => { + const testPolicies = [ + "test-policy-1", + "test-policy-2", + "test-policy-update", + "test-policy-delete", + "test-priority-policy", + "test-pattern-policy", + "ttl-policy-test-queue-e2e" + ]; + + for (const policyName of testPolicies) { + await managementClient.deleteOperatorPolicy("%2F", policyName); + } + + await RunMQUtils.delay(100); + }); + + describe('Management Plugin Check', () => { + it('should successfully detect when management plugin is enabled', async () => { + const isEnabled = await managementClient.checkManagementPluginEnabled(); + + expect(isEnabled).toBe(true); + }); + + it('should handle connection to non-existent management endpoint', async () => { + const invalidClient = new RabbitMQManagementClient( + RabbitMQManagementConfigExample.invalid(), + MockedRunMQLogger + ); + + const isEnabled = await invalidClient.checkManagementPluginEnabled(); + + expect(isEnabled).toBe(false); + expect(MockedRunMQLogger.warn).toHaveBeenCalledWith( + expect.stringContaining("Management plugin not accessible") + ); + }); + + it('should handle authentication failure', async () => { + const unauthorizedClient = new RabbitMQManagementClient( + RabbitMQManagementConfigExample.invalidCredentials(), + MockedRunMQLogger + ); + + const isEnabled = await unauthorizedClient.checkManagementPluginEnabled(); + + expect(isEnabled).toBe(false); + }); + }); + + describe('Operator Policy Creation', () => { + it('should create a new operator policy successfully', async () => { + const policy = RabbitMQMessageTTLPolicyExample.validPolicy("test-queue-e2e", 10000); + + const result = await managementClient.createOrUpdateOperatorPolicy("%2F", policy); + + expect(result).toBe(true); + expect(MockedRunMQLogger.info).toHaveBeenCalledWith( + expect.stringContaining("Successfully set operator policy") + ); + + const retrievedPolicy = await managementClient.getOperatorPolicy("%2F", policy.name); + expect(retrievedPolicy).not.toBeNull(); + expect(retrievedPolicy?.pattern).toBe(policy.pattern); + expect(retrievedPolicy?.definition["message-ttl"]).toBe(10000); + }); + + it('should create policy with custom pattern and definition', async () => { + const customPolicy: RabbitMQOperatorPolicy = { + name: "test-pattern-policy", + pattern: "^test\\..*", + definition: { + "message-ttl": 5000, + "max-length": 100 + }, + "apply-to": "queues", + priority: 10 + }; + + const result = await managementClient.createOrUpdateOperatorPolicy("%2F", customPolicy); + + expect(result).toBe(true); + expect(MockedRunMQLogger.info).toHaveBeenCalledWith( + expect.stringContaining("Successfully set operator policy: test-pattern-policy") + ); + + const retrievedPolicy = await managementClient.getOperatorPolicy("%2F", "test-pattern-policy"); + expect(retrievedPolicy).not.toBeNull(); + expect(retrievedPolicy?.pattern).toBe("^test\\..*"); + expect(retrievedPolicy?.definition["message-ttl"]).toBe(5000); + expect(retrievedPolicy?.definition["max-length"]).toBe(100); + expect(retrievedPolicy?.priority).toBe(10); + }); + + it('should update an existing policy', async () => { + const initialPolicy: RabbitMQOperatorPolicy = { + name: "test-policy-update", + pattern: "update-queue", + definition: { + "message-ttl": 5000 + }, + "apply-to": "queues" + }; + + await managementClient.createOrUpdateOperatorPolicy("%2F", initialPolicy); + await RunMQUtils.delay(100); + + const initialRetrieved = await managementClient.getOperatorPolicy("%2F", "test-policy-update"); + expect(initialRetrieved?.definition["message-ttl"]).toBe(5000); + + const updatedPolicy: RabbitMQOperatorPolicy = { + ...initialPolicy, + definition: { + "message-ttl": 10000 + } + }; + + const updateResult = await managementClient.createOrUpdateOperatorPolicy("%2F", updatedPolicy); + + expect(updateResult).toBe(true); + + const updatedRetrieved = await managementClient.getOperatorPolicy("%2F", "test-policy-update"); + expect(updatedRetrieved?.definition["message-ttl"]).toBe(10000); + }); + }); + + describe('Operator Policy Retrieval', () => { + it('should retrieve an existing operator policy', async () => { + const policy: RabbitMQOperatorPolicy = { + name: "test-policy-1", + pattern: "test-queue", + definition: { + "message-ttl": 5000 + }, + "apply-to": "queues" + }; + + await managementClient.createOrUpdateOperatorPolicy("%2F", policy); + await RunMQUtils.delay(100); + + const retrievedPolicy = await managementClient.getOperatorPolicy("%2F", "test-policy-1"); + + expect(retrievedPolicy).not.toBeNull(); + expect(retrievedPolicy?.name).toBe("test-policy-1"); + expect(retrievedPolicy?.pattern).toBe("test-queue"); + expect(retrievedPolicy?.definition["message-ttl"]).toBe(5000); + expect(retrievedPolicy?.["apply-to"]).toBe("queues"); + }); + + it('should return null for non-existent policy', async () => { + const retrievedPolicy = await managementClient.getOperatorPolicy("%2F", "non-existent-policy"); + + expect(retrievedPolicy).toBeNull(); + }); + + it('should handle errors when retrieving policy', async () => { + const retrievedPolicy = await managementClient.getOperatorPolicy("invalid%vhost", "test-policy"); + + expect(retrievedPolicy).toBeNull(); + expect(MockedRunMQLogger.error).toHaveBeenCalledWith( + expect.stringContaining("Failed to get operator policy") + ); + }); + }); + + describe('Operator Policy Deletion', () => { + it('should delete an existing operator policy successfully', async () => { + const policy: RabbitMQOperatorPolicy = { + name: "test-policy-delete", + pattern: "delete-queue", + definition: { + "message-ttl": 5000 + }, + "apply-to": "queues" + }; + + await managementClient.createOrUpdateOperatorPolicy("%2F", policy); + await RunMQUtils.delay(100); + + const policyBefore = await managementClient.getOperatorPolicy("%2F", "test-policy-delete"); + expect(policyBefore).not.toBeNull(); + + const result = await managementClient.deleteOperatorPolicy("%2F", "test-policy-delete"); + + expect(result).toBe(true); + expect(MockedRunMQLogger.info).toHaveBeenCalledWith( + expect.stringContaining("Successfully deleted operator policy: test-policy-delete") + ); + + const policyAfter = await managementClient.getOperatorPolicy("%2F", "test-policy-delete"); + expect(policyAfter).toBeNull(); + }); + + it('should handle deletion of non-existent policy gracefully', async () => { + const result = await managementClient.deleteOperatorPolicy("%2F", "non-existent-policy"); + + expect(result).toBe(true); // Should return true for 404 (not found) + expect(MockedRunMQLogger.info).toHaveBeenCalledWith( + expect.stringContaining("Successfully deleted operator policy") + ); + }); + + it('should handle deletion errors', async () => { + const result = await managementClient.deleteOperatorPolicy("invalid%vhost", "test-policy"); + + expect(result).toBe(false); + expect(MockedRunMQLogger.error).toHaveBeenCalledWith( + expect.stringContaining("Failed to delete operator policy") + ); + }); + }); + + describe('Concurrent Operations', () => { + it('should handle multiple concurrent policy creations', async () => { + const policies: RabbitMQOperatorPolicy[] = [ + { + name: "test-policy-1", + pattern: "queue-1", + definition: {"message-ttl": 5000}, + "apply-to": "queues" + }, + { + name: "test-policy-2", + pattern: "queue-2", + definition: {"message-ttl": 10000}, + "apply-to": "queues" + } + ]; + + const results = await Promise.all( + policies.map(policy => + managementClient.createOrUpdateOperatorPolicy("%2F", policy) + ) + ); + + expect(results).toEqual([true, true]); + + const policy1 = await managementClient.getOperatorPolicy("%2F", "test-policy-1"); + const policy2 = await managementClient.getOperatorPolicy("%2F", "test-policy-2"); + + expect(policy1).not.toBeNull(); + expect(policy2).not.toBeNull(); + expect(policy1?.definition["message-ttl"]).toBe(5000); + expect(policy2?.definition["message-ttl"]).toBe(10000); + }); + + it('should handle concurrent policy operations (create/delete)', async () => { + const policy: RabbitMQOperatorPolicy = { + name: "concurrent-test-policy", + pattern: "concurrent-queue", + definition: {"message-ttl": 5000}, + "apply-to": "queues" + }; + + await managementClient.createOrUpdateOperatorPolicy("%2F", policy); + await RunMQUtils.delay(100); + + const [createResult, deleteResult] = await Promise.all([ + managementClient.createOrUpdateOperatorPolicy("%2F", { + ...policy, + name: "concurrent-test-policy-2" + }), + managementClient.deleteOperatorPolicy("%2F", "concurrent-test-policy") + ]); + + expect(createResult).toBe(true); + expect(deleteResult).toBe(true); + + await managementClient.deleteOperatorPolicy("%2F", "concurrent-test-policy-2"); + }); + }); + + describe('Network Error Recovery', () => { + it('should handle network timeouts gracefully', async () => { + const timeoutClient = new RabbitMQManagementClient( + RabbitMQManagementConfigExample.nonRoutableHost(), + MockedRunMQLogger + ); + + const policy: RabbitMQOperatorPolicy = { + name: "timeout-test", + pattern: "timeout-queue", + definition: {"message-ttl": 5000}, + "apply-to": "queues" + }; + + const result = await timeoutClient.createOrUpdateOperatorPolicy("%2F", policy); + + expect(result).toBe(false); + expect(MockedRunMQLogger.error).toHaveBeenCalledWith( + expect.stringContaining("Error creating operator policy") + ); + }); + }); +}); \ No newline at end of file diff --git a/tests/e2e/RunMQ.processing.e2e.test.ts b/tests/e2e/RunMQ.processing.e2e.test.ts index 2ad0353..29ff4eb 100644 --- a/tests/e2e/RunMQ.processing.e2e.test.ts +++ b/tests/e2e/RunMQ.processing.e2e.test.ts @@ -3,15 +3,17 @@ import {AmqplibClient} from "@src/core/clients/AmqplibClient"; import {Constants} from "@src/core/constants"; import {ChannelTestHelpers} from "@tests/helpers/ChannelTestHelpers"; import {ConsumerCreatorUtils} from "@src/core/consumer/ConsumerCreatorUtils"; -import {RunMQUtils} from "@src/core/utils/Utils"; +import {RunMQUtils} from "@src/core/utils/RunMQUtils"; import {MockedRunMQLogger} from "@tests/mocks/MockedRunMQLogger"; import {RunMQConnectionConfigExample} from "@tests/Examples/RunMQConnectionConfigExample"; import {RunMQProcessorConfigurationExample} from "@tests/Examples/RunMQProcessorConfigurationExample"; import {RunMQMessageExample} from "@tests/Examples/RunMQMessageExample"; import {MessageTestUtils} from "@tests/helpers/MessageTestUtils"; +import {faker} from "@faker-js/faker"; describe('RunMQ E2E Tests', () => { const validConfig = RunMQConnectionConfigExample.valid(); + const validConfigWithManagement = RunMQConnectionConfigExample.validWithManagement() beforeEach(() => { jest.clearAllMocks(); @@ -266,6 +268,66 @@ describe('RunMQ E2E Tests', () => { await runMQ.disconnect(); await testingConnection.disconnect(); }) + + it('Should process with TTL policy disabled when usePoliciesForTTL is false', async () => { + const configuration = RunMQProcessorConfigurationExample.simpleNoSchema() + + const channel = await testingConnection.getChannel(); + await ChannelTestHelpers.deleteQueue(channel, configuration.name); + + const runMQ = await RunMQ.start(validConfig, MockedRunMQLogger); + let attemptCount = 0; + await runMQ.process("user.created", configuration, + () => { + attemptCount++; + if (attemptCount < 2) { + throw new Error("Retry me"); + } + return Promise.resolve(); + } + ) + + channel.publish(Constants.ROUTER_EXCHANGE_NAME, 'user.created', MessageTestUtils.buffer(RunMQMessageExample.person())) + + await RunMQUtils.delay(700); + expect(attemptCount).toBe(2); + + await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getDLQTopicName(configuration.name), 0) + await ChannelTestHelpers.assertQueueMessageCount(channel, configuration.name, 0) + await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getRetryDelayTopicName(configuration.name), 0) + await runMQ.disconnect(); + await testingConnection.disconnect(); + }) + + it('Should process with TTL policy when enabled', async () => { + const configuration = RunMQProcessorConfigurationExample.simpleNoSchema(faker.lorem.word(), 1, true); + + const channel = await testingConnection.getChannel(); + await ChannelTestHelpers.deleteQueue(channel, configuration.name); + + const runMQ = await RunMQ.start(validConfigWithManagement, MockedRunMQLogger); + let attemptCount = 0; + await runMQ.process("user.created", configuration, + () => { + attemptCount++; + if (attemptCount < 2) { + throw new Error("Retry me"); + } + return Promise.resolve(); + } + ) + + channel.publish(Constants.ROUTER_EXCHANGE_NAME, 'user.created', MessageTestUtils.buffer(RunMQMessageExample.person())) + + await RunMQUtils.delay(700); + expect(attemptCount).toBe(2); + + await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getDLQTopicName(configuration.name), 0) + await ChannelTestHelpers.assertQueueMessageCount(channel, configuration.name, 0) + await ChannelTestHelpers.assertQueueMessageCount(channel, ConsumerCreatorUtils.getRetryDelayTopicName(configuration.name), 0) + await runMQ.disconnect(); + await testingConnection.disconnect(); + }) }) }); diff --git a/tests/e2e/RunMQ.publisher.e2e.test.ts b/tests/e2e/RunMQ.publisher.e2e.test.ts index 34c5956..cf985c2 100644 --- a/tests/e2e/RunMQ.publisher.e2e.test.ts +++ b/tests/e2e/RunMQ.publisher.e2e.test.ts @@ -7,7 +7,7 @@ import {RunMQProcessorConfigurationExample} from "@tests/Examples/RunMQProcessor import {MockedRunMQLogger} from "@tests/mocks/MockedRunMQLogger"; import {RunMQConnectionConfigExample} from "@tests/Examples/RunMQConnectionConfigExample"; import {RunMQMessageExample} from "@tests/Examples/RunMQMessageExample"; -import {RunMQUtils} from "@src/core/utils/Utils"; +import {RunMQUtils} from "@src/core/utils/RunMQUtils"; describe('RunMQ Publisher E2E Tests', () => { const validConfig = RunMQConnectionConfigExample.valid(); diff --git a/tests/e2e/RunMQTTLPolicyManager.e2e.test.ts b/tests/e2e/RunMQTTLPolicyManager.e2e.test.ts new file mode 100644 index 0000000..e69e2e7 --- /dev/null +++ b/tests/e2e/RunMQTTLPolicyManager.e2e.test.ts @@ -0,0 +1,193 @@ +import {RunMQTTLPolicyManager} from "@src/core/management/Policies/RunMQTTLPolicyManager"; +import {RabbitMQManagementClient} from "@src/core/management/RabbitMQManagementClient"; +import {MockedRunMQLogger} from "@tests/mocks/MockedRunMQLogger"; +import {RabbitMQManagementConfigExample} from "@tests/Examples/RabbitMQManagementConfigExample"; +import {RunMQUtils} from "@src/core/utils/RunMQUtils"; +import {ConsumerCreatorUtils} from "@src/core/consumer/ConsumerCreatorUtils"; + +describe('RunMQTTLPolicyManager E2E Tests', () => { + const validManagementConfig = RabbitMQManagementConfigExample.valid(); + let policyManager: RunMQTTLPolicyManager; + let managementClient: RabbitMQManagementClient; + + beforeEach(() => { + jest.clearAllMocks(); + policyManager = new RunMQTTLPolicyManager(MockedRunMQLogger, validManagementConfig); + managementClient = new RabbitMQManagementClient(validManagementConfig, MockedRunMQLogger); + }); + + afterEach(async () => { + await managementClient.deleteOperatorPolicy("%2F", "ttl-policy-test-queue"); + await managementClient.deleteOperatorPolicy("%2F", "ttl-policy-custom-ttl-queue"); + }); + + describe('Policy Manager Initialization', () => { + it('should initialize successfully when management plugin is enabled', async () => { + await policyManager.initialize(); + + expect(MockedRunMQLogger.info).toHaveBeenCalledWith( + expect.stringContaining("RabbitMQ management plugin is enabled") + ); + }); + + it('should handle initialization when management config is not provided', async () => { + const policyManagerWithoutConfig = new RunMQTTLPolicyManager(MockedRunMQLogger); + await policyManagerWithoutConfig.initialize(); + + expect(MockedRunMQLogger.warn).toHaveBeenCalledWith( + "Management client not configured" + ); + }); + }); + + describe('TTL Policy Application', () => { + beforeEach(async () => { + await policyManager.initialize(); + }); + + it('should apply TTL policy with default TTL when TTL is not specified', async () => { + const topicName = "test-queue"; + const result = await policyManager.apply(topicName); + + expect(result).toBe(true); + + await RunMQUtils.delay(100); + + // Verify the policy was actually created + const policy = await managementClient.getOperatorPolicy("%2F", ConsumerCreatorUtils.getMessageTTLPolicyName(topicName)); + expect(policy).not.toBeNull(); + expect(policy?.definition["message-ttl"]).toBeDefined(); + expect(policy?.pattern).toBe("test-queue"); + + expect(MockedRunMQLogger.info).toHaveBeenCalledWith( + expect.stringContaining("Successfully set operator policy") + ); + }); + + it('should apply TTL policy with custom TTL value', async () => { + const topicName = "custom-ttl-queue"; + const customTTL = 60000; // 60 seconds + + const result = await policyManager.apply(topicName, customTTL); + + expect(result).toBe(true); + + await RunMQUtils.delay(100); + + // Verify the policy was created with the correct TTL + const policy = await managementClient.getOperatorPolicy("%2F", ConsumerCreatorUtils.getMessageTTLPolicyName(topicName)); + expect(policy).not.toBeNull(); + expect(policy?.definition["message-ttl"]).toBe(customTTL); + expect(policy?.pattern).toBe("custom-ttl-queue"); + + expect(MockedRunMQLogger.info).toHaveBeenCalledWith( + expect.stringContaining("Successfully set operator policy") + ); + }); + + it('should apply policy with custom vhost', async () => { + const topicName = "vhost-test-queue"; + const customVhost = "%2F"; + + const result = await policyManager.apply(topicName, undefined, customVhost); + + expect(result).toBe(true); + + await RunMQUtils.delay(100); + + await policyManager.cleanup(topicName, customVhost); + }); + + it('should return false when management plugin is not available', async () => { + const policyManagerWithoutPlugin = new RunMQTTLPolicyManager(MockedRunMQLogger); + await policyManagerWithoutPlugin.initialize(); + + const result = await policyManagerWithoutPlugin.apply("test-queue"); + + expect(result).toBe(false); + }); + }); + + describe('Policy Cleanup', () => { + beforeEach(async () => { + await policyManager.initialize(); + }); + + it('should cleanup TTL policy successfully', async () => { + const topicName = "cleanup-test-queue"; + + await policyManager.apply(topicName); + await RunMQUtils.delay(100); + + const policyBefore = await managementClient.getOperatorPolicy("%2F", ConsumerCreatorUtils.getMessageTTLPolicyName(topicName)); + expect(policyBefore).not.toBeNull(); + + await policyManager.cleanup(topicName); + await RunMQUtils.delay(100); + + const policyAfter = await managementClient.getOperatorPolicy("%2F", "ttl-policy-cleanup-test-queue"); + expect(policyAfter).toBeNull(); + + expect(MockedRunMQLogger.info).toHaveBeenCalledWith( + expect.stringContaining("Successfully deleted operator policy") + ); + }); + + it('should handle cleanup when policy does not exist', async () => { + const topicName = "non-existent-queue"; + + await policyManager.cleanup(topicName); + + await RunMQUtils.delay(100); + expect(MockedRunMQLogger.info).toHaveBeenCalledWith( + expect.stringContaining("Successfully deleted operator policy") + ); + }); + + it('should cleanup policy with custom vhost', async () => { + const topicName = "vhost-cleanup-queue"; + const customVhost = "%2F"; + + await policyManager.apply(topicName, undefined, customVhost); + await RunMQUtils.delay(100); + + await policyManager.cleanup(topicName, customVhost); + await RunMQUtils.delay(100); + + expect(MockedRunMQLogger.info).toHaveBeenCalledWith( + expect.stringContaining("Successfully deleted operator policy") + ); + }); + + it('should handle cleanup when management plugin is not available', async () => { + const policyManagerWithoutPlugin = new RunMQTTLPolicyManager(MockedRunMQLogger); + await policyManagerWithoutPlugin.initialize(); + + await policyManagerWithoutPlugin.cleanup("test-queue"); + + expect(MockedRunMQLogger.info).not.toHaveBeenCalledWith( + expect.stringContaining("Successfully deleted operator policy") + ); + }); + }); + + describe('Error Handling', () => { + it('should handle network errors gracefully during initialization', async () => { + const errorPolicyManager = new RunMQTTLPolicyManager(MockedRunMQLogger, RabbitMQManagementConfigExample.invalid()); + await errorPolicyManager.initialize(); + + expect(MockedRunMQLogger.warn).toHaveBeenCalledWith( + expect.stringContaining("Management plugin not accessible") + ); + }); + + it('should handle authentication errors', async () => { + const authErrorPolicyManager = new RunMQTTLPolicyManager(MockedRunMQLogger, RabbitMQManagementConfigExample.invalid()); + await authErrorPolicyManager.initialize(); + + expect(MockedRunMQLogger.warn).toHaveBeenCalledWith( + expect.stringMatching(/Management plugin not accessible|RabbitMQ management plugin is not enabled/) + ); + }); + }); +}); \ No newline at end of file diff --git a/tests/mocks/MockedRabbitMQMessage.ts b/tests/mocks/MockedRabbitMQMessage.ts index 8f3a228..86f2af1 100644 --- a/tests/mocks/MockedRabbitMQMessage.ts +++ b/tests/mocks/MockedRabbitMQMessage.ts @@ -1,4 +1,4 @@ -import {RunMQUtils} from "@src/core/utils/Utils"; +import {RunMQUtils} from "@src/core/utils/RunMQUtils"; import {MessageExample} from "@tests/Examples/MessageExample"; import {MockedRabbitMQChannel} from "@tests/mocks/MockedRabbitMQChannel"; import {RabbitMQMessage} from "@src/core/message/RabbitMQMessage"; diff --git a/tests/unit/core/RunMQ.test.ts b/tests/unit/core/RunMQ.test.ts index f2b8059..cf69d1c 100644 --- a/tests/unit/core/RunMQ.test.ts +++ b/tests/unit/core/RunMQ.test.ts @@ -2,7 +2,7 @@ import {RunMQ} from '@src/core/RunMQ'; import {AmqplibClient} from '@src/core/clients/AmqplibClient'; import {RunMQException} from '@src/core/exceptions/RunMQException'; import {Exceptions} from '@src/core/exceptions/Exceptions'; -import {RunMQUtils} from '@src/core/utils/Utils'; +import {RunMQUtils} from '@src/core/utils/RunMQUtils'; import {RunMQConsumerCreator} from '@src/core/consumer/RunMQConsumerCreator'; import {Channel} from 'amqplib'; import {Constants} from '@src/core/constants'; @@ -14,7 +14,7 @@ import {MockedRunMQLogger} from "@tests/mocks/MockedRunMQLogger"; import {RunMQPublisherCreator} from "@src/core/publisher/RunMQPublisherCreator"; jest.mock('@src/core/clients/AmqplibClient'); -jest.mock('@src/core/utils/Utils'); +jest.mock('@src/core/utils/RunMQUtils'); jest.mock('@src/core/consumer/RunMQConsumerCreator'); jest.mock('@src/core/publisher/RunMQPublisherCreator'); @@ -127,13 +127,14 @@ describe('RunMQ Unit Tests', () => { expect(mockConsumerCreator).toHaveBeenCalledWith( mockChannel, expect.any(AmqplibClient), - expect.any(Object) + expect.any(Object), + undefined ); expect(mockConsumerCreator.prototype.createConsumer).toHaveBeenCalledWith( expect.objectContaining({ topic: 'test.topic', processorConfig: processorConfig, - processor: processor + processor: processor, }) ); }); diff --git a/tests/unit/core/consumer/RunMQConsumerCreator.test.ts b/tests/unit/core/consumer/RunMQConsumerCreator.test.ts index 6dcc411..a985eb4 100644 --- a/tests/unit/core/consumer/RunMQConsumerCreator.test.ts +++ b/tests/unit/core/consumer/RunMQConsumerCreator.test.ts @@ -6,10 +6,18 @@ import {ConsumerConfigurationExample} from "@tests/Examples/ConsumerConfiguratio import {RunMQConsumerCreator} from "@src/core/consumer/RunMQConsumerCreator"; import {MockedRunMQLogger} from "@tests/mocks/MockedRunMQLogger"; import {MockedAMQPClient} from "@tests/mocks/MockedAMQPClient"; +import {RunMQTTLPolicyManager} from "@src/core/management/Policies/RunMQTTLPolicyManager"; +import {RunMQException} from "@src/core/exceptions/RunMQException"; + +jest.mock('@src/core/management/Policies/RunMQTTLPolicyManager'); describe('RunMQConsumerCreator Unit Tests', () => { const mockedChannel = new MockedRabbitMQChannel(); const mockedClient = new MockedAMQPClient(mockedChannel); + const mockTTLPolicyManager = { + initialize: jest.fn(), + apply: jest.fn() + }; const testProcessorConfig = RunMQProcessorConfigurationExample.random( 'testProcessor', @@ -17,15 +25,29 @@ describe('RunMQConsumerCreator Unit Tests', () => { 2, 5000 ); + const testProcessorConfigWithPolicies = RunMQProcessorConfigurationExample.random( + 'testProcessor', + 3, + 2, + 5000, + undefined, + true + ); + const testConsumerConfigWithPolicies = ConsumerConfigurationExample.withProcessorConfig(testProcessorConfigWithPolicies) + const testConsumerConfig = ConsumerConfigurationExample.withProcessorConfig(testProcessorConfig) - const consumerCreator = new RunMQConsumerCreator(mockedChannel, mockedClient, MockedRunMQLogger) + let consumerCreator: RunMQConsumerCreator; beforeEach(() => { jest.clearAllMocks(); + jest.mocked(RunMQTTLPolicyManager).mockImplementation(() => mockTTLPolicyManager as any); + mockTTLPolicyManager.initialize.mockResolvedValue(undefined); + mockTTLPolicyManager.apply.mockResolvedValue(true); + consumerCreator = new RunMQConsumerCreator(mockedChannel, mockedClient, MockedRunMQLogger, undefined); }); describe('createConsumer', () => { - it('should create consumer with correct queue assertions', async () => { + it('should create consumer with correct queue assertions when usePoliciesForTTL is false', async () => { await consumerCreator.createConsumer(testConsumerConfig); expect(mockedChannel.assertQueue).toHaveBeenCalledWith( @@ -54,6 +76,33 @@ describe('RunMQConsumerCreator Unit Tests', () => { deadLetterRoutingKey: testProcessorConfig.name } ); + + expect(mockTTLPolicyManager.apply).not.toHaveBeenCalled(); + }); + + it('should use TTL policies when usePoliciesForTTL is true', async () => { + + await consumerCreator.createConsumer(testConsumerConfigWithPolicies); + + expect(mockTTLPolicyManager.apply).toHaveBeenCalledWith( + Constants.RETRY_DELAY_QUEUE_PREFIX + testProcessorConfig.name, + testProcessorConfig.attemptsDelay + ); + + expect(mockedChannel.assertQueue).toHaveBeenCalledWith( + Constants.RETRY_DELAY_QUEUE_PREFIX + testProcessorConfig.name, + { + durable: true, + deadLetterExchange: Constants.ROUTER_EXCHANGE_NAME + } + ); + }); + + it('should throw exception when TTL policy application fails', async () => { + mockTTLPolicyManager.apply.mockResolvedValue(false); + + await expect(consumerCreator.createConsumer(testConsumerConfigWithPolicies)) + .rejects.toThrow(RunMQException); }); it('should bind queues correctly', async () => { diff --git a/tests/unit/core/management/RabbitMQManagementClient.test.ts b/tests/unit/core/management/RabbitMQManagementClient.test.ts new file mode 100644 index 0000000..8b7673a --- /dev/null +++ b/tests/unit/core/management/RabbitMQManagementClient.test.ts @@ -0,0 +1,147 @@ +import {RabbitMQManagementClient} from "@src/core/management/RabbitMQManagementClient"; +import {RunMQConsoleLogger} from "@src/core/logging/RunMQConsoleLogger"; +import {RabbitMQMessageTTLPolicyExample} from "@tests/Examples/RabbitMQOperatorPolicyExamples"; +import {RabbitMQManagementConfigExample} from "@tests/Examples/RabbitMQManagementConfigExample"; +import {ConsumerCreatorUtils} from "@src/core/consumer/ConsumerCreatorUtils"; + +describe('RabbitMQManagementClient', () => { + let client: RabbitMQManagementClient; + let logger: RunMQConsoleLogger; + let fetchSpy: jest.SpyInstance; + const MANAGEMENT_URL = "http://localhost:15673/api/operator-policies/%2F/" + + beforeEach(() => { + logger = new RunMQConsoleLogger(); + client = new RabbitMQManagementClient( + RabbitMQManagementConfigExample.valid(), + logger + ); + + global.fetch = jest.fn(); + fetchSpy = jest.spyOn(global, 'fetch'); + }); + + afterEach(() => { + jest.clearAllMocks(); + }); + + describe('createOperatorPolicy', () => { + it('should successfully create an operator policy', async () => { + fetchSpy.mockResolvedValueOnce({ + ok: true, + status: 201, + text: async () => '' + } as Response); + + const result = await client.createOrUpdateOperatorPolicy('%2F', RabbitMQMessageTTLPolicyExample.validPolicy('test-queue', 5000)); + + expect(result).toBe(true); + expect(fetchSpy).toHaveBeenCalledWith( + MANAGEMENT_URL + ConsumerCreatorUtils.getMessageTTLPolicyName('test-queue'), + expect.objectContaining({ + method: 'PUT', + headers: expect.objectContaining({ + 'Content-Type': 'application/json', + 'Authorization': expect.stringContaining('Basic') + }), + body: JSON.stringify({ + pattern: 'test-queue', + definition: { + 'message-ttl': 5000 + }, + priority: 1000, + 'apply-to': 'queues' + }) + }) + ); + }); + + it('should handle failed policy creation', async () => { + fetchSpy.mockResolvedValueOnce({ + ok: false, + status: 400, + text: async () => 'Bad Request' + } as Response); + + const result = await client.createOrUpdateOperatorPolicy('%2F', RabbitMQMessageTTLPolicyExample.validPolicy()); + + expect(result).toBe(false); + }); + }); + + describe('checkManagementPluginEnabled', () => { + it('should return true when management plugin is accessible', async () => { + fetchSpy.mockResolvedValueOnce({ + ok: true, + status: 200 + } as Response); + + const result = await client.checkManagementPluginEnabled(); + + expect(result).toBe(true); + expect(fetchSpy).toHaveBeenCalledWith( + 'http://localhost:15673/api/overview', + expect.objectContaining({ + method: 'GET', + headers: expect.objectContaining({ + 'Authorization': expect.stringContaining('Basic') + }) + }) + ); + }); + + it('should return false when management plugin is not accessible', async () => { + fetchSpy.mockResolvedValueOnce({ + ok: false, + status: 404 + } as Response); + + const result = await client.checkManagementPluginEnabled(); + + expect(result).toBe(false); + }); + + it('should return false on network error', async () => { + fetchSpy.mockRejectedValueOnce(new Error('Network error')); + + const result = await client.checkManagementPluginEnabled(); + + expect(result).toBe(false); + }); + }); + + describe('deleteOperatorPolicy', () => { + it('should successfully delete an operator policy', async () => { + fetchSpy.mockResolvedValueOnce({ + ok: true, + status: 204, + text: async () => '' + } as Response); + + const result = await client.deleteOperatorPolicy('%2F', 'test-queue'); + + expect(result).toBe(true); + expect(fetchSpy).toHaveBeenCalledWith( + MANAGEMENT_URL + 'test-queue', + expect.objectContaining({ + method: 'DELETE', + headers: expect.objectContaining({ + 'Authorization': expect.stringContaining('Basic') + }) + }) + ); + }); + + it('should handle 404 as success', async () => { + fetchSpy.mockResolvedValueOnce({ + ok: false, + status: 404, + text: async () => 'Not Found' + } as Response); + + const result = await client.deleteOperatorPolicy('%2F', 'test-policy'); + + expect(result).toBe(true); + }); + }); +}); \ No newline at end of file diff --git a/tests/unit/core/management/RunMQTTLPolicyManager.test.ts b/tests/unit/core/management/RunMQTTLPolicyManager.test.ts new file mode 100644 index 0000000..97fec98 --- /dev/null +++ b/tests/unit/core/management/RunMQTTLPolicyManager.test.ts @@ -0,0 +1,138 @@ +import {RunMQTTLPolicyManager} from "@src/core/management/Policies/RunMQTTLPolicyManager"; +import {RabbitMQManagementClient} from "@src/core/management/RabbitMQManagementClient"; +import {RunMQConsoleLogger} from "@src/core/logging/RunMQConsoleLogger"; +import {RabbitMQMessageTTLPolicyExample} from "@tests/Examples/RabbitMQOperatorPolicyExamples"; +import {RabbitMQManagementConfigExample} from "@tests/Examples/RabbitMQManagementConfigExample"; + +jest.mock("@src/core/management/RabbitMQManagementClient"); + +describe('TTLPolicyManager', () => { + let ttlPolicyManager: RunMQTTLPolicyManager; + let logger: RunMQConsoleLogger; + let mockManagementClient: jest.Mocked; + + beforeEach(() => { + logger = new RunMQConsoleLogger(); + jest.spyOn(logger, 'info').mockImplementation(); + jest.spyOn(logger, 'warn').mockImplementation(); + + mockManagementClient = new RabbitMQManagementClient( + RabbitMQManagementConfigExample.valid(), + logger + ) as jest.Mocked; + }); + + afterEach(() => { + jest.clearAllMocks(); + }); + + describe('without management config', () => { + beforeEach(() => { + ttlPolicyManager = new RunMQTTLPolicyManager(logger); + }); + + it('should initialize without management client', async () => { + await ttlPolicyManager.initialize(); + + expect(logger.warn).toHaveBeenCalledWith( + "Management client not configured" + ); + }); + + it('should return false when management is not configured', async () => { + await ttlPolicyManager.initialize(); + const result = await ttlPolicyManager.apply('test-queue', 5000); + expect(result).toBe(false); + }); + }); + + describe('with management config', () => { + beforeEach(() => { + ttlPolicyManager = new RunMQTTLPolicyManager(logger, RabbitMQManagementConfigExample.valid()); + + (ttlPolicyManager as any).managementClient = mockManagementClient; + }); + + it('should check if management plugin is enabled during initialization', async () => { + mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(true); + + await ttlPolicyManager.initialize(); + + expect(mockManagementClient.checkManagementPluginEnabled).toHaveBeenCalled(); + expect(logger.info).toHaveBeenCalledWith( + "RabbitMQ management plugin is enabled" + ); + }); + + it('should return false when management plugin is not enabled', async () => { + mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(false); + + await ttlPolicyManager.initialize(); + const result = await ttlPolicyManager.apply('test-queue', 5000); + expect(result).toBe(false); + }); + + it('should create operator policy when management plugin is enabled', async () => { + mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(true); + mockManagementClient.createOrUpdateOperatorPolicy.mockResolvedValue(true); + + await ttlPolicyManager.initialize(); + await ttlPolicyManager.apply('test-queue', 5000); + + expect(mockManagementClient.createOrUpdateOperatorPolicy).toHaveBeenCalledWith( + '%2F', + RabbitMQMessageTTLPolicyExample.validPolicy('test-queue', 5000) + ); + }); + + it('should return false when policy creation fails', async () => { + mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(true); + mockManagementClient.createOrUpdateOperatorPolicy.mockResolvedValue(false); + + await ttlPolicyManager.initialize(); + const result = await ttlPolicyManager.apply('test-queue', 5000); + expect(result).toBe(false); + }); + + it('should escape special characters in queue name for policy pattern', async () => { + mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(true); + mockManagementClient.createOrUpdateOperatorPolicy.mockResolvedValue(true); + + await ttlPolicyManager.initialize(); + await ttlPolicyManager.apply('test.queue[special]', 5000); + + expect(mockManagementClient.createOrUpdateOperatorPolicy).toHaveBeenCalledWith( + '%2F', + expect.objectContaining({ + pattern: 'test\\.queue\\[special\\]' + }) + ); + }); + + it('should cleanup policy when requested', async () => { + mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(true); + mockManagementClient.deleteOperatorPolicy.mockResolvedValue(true); + + await ttlPolicyManager.initialize(); + await ttlPolicyManager.cleanup('test-queue'); + + expect(mockManagementClient.deleteOperatorPolicy).toHaveBeenCalledWith( + '%2F', + 'ttl-policy-test-queue' + ); + }); + + it('should handle custom vhost', async () => { + mockManagementClient.checkManagementPluginEnabled.mockResolvedValue(true); + mockManagementClient.createOrUpdateOperatorPolicy.mockResolvedValue(true); + + await ttlPolicyManager.initialize(); + await ttlPolicyManager.apply('test-queue', 5000, 'custom-vhost'); + + expect(mockManagementClient.createOrUpdateOperatorPolicy).toHaveBeenCalledWith( + 'custom-vhost', + expect.any(Object) + ); + }); + }); +}); \ No newline at end of file From cb4c0dd7ca7cd406e7116d053e94073926456290 Mon Sep 17 00:00:00 2001 From: Fawzi Essam Date: Sun, 9 Nov 2025 02:15:36 +0100 Subject: [PATCH 2/2] improving naming Signed-off-by: Fawzi Essam --- src/core/consumer/RunMQConsumerCreator.ts | 2 +- src/types/index.ts | 4 ++-- tests/Examples/RunMQProcessorConfigurationExample.ts | 8 ++++---- tests/e2e/RunMQ.processing.e2e.test.ts | 2 +- tests/unit/core/consumer/RunMQConsumerCreator.test.ts | 4 ++-- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/core/consumer/RunMQConsumerCreator.ts b/src/core/consumer/RunMQConsumerCreator.ts index 973adf0..047bf4d 100644 --- a/src/core/consumer/RunMQConsumerCreator.ts +++ b/src/core/consumer/RunMQConsumerCreator.ts @@ -95,7 +95,7 @@ export class RunMQConsumerCreator { const messageDelay = consumerConfiguration.processorConfig.attemptsDelay ?? DEFAULTS.PROCESSING_RETRY_DELAY - const policiesForTTL = consumerConfiguration.processorConfig.usePoliciesForTTL ?? false; + const policiesForTTL = consumerConfiguration.processorConfig.usePoliciesForDelay ?? false; if (!policiesForTTL) { await this.defaultChannel.assertQueue(retryDelayQueueName, { durable: true, diff --git a/src/types/index.ts b/src/types/index.ts index bd8c4fa..37ceff7 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -65,13 +65,13 @@ export interface RunMQProcessorConfiguration { messageSchema?: MessageSchema /** - * Whether to use RabbitMQ policies for setting TTL instead of queue-level TTL. + * Whether to use RabbitMQ policies for setting the attempts delay instead of queue-level TTL. * Requires management configuration to be provided in RunMQConnectionConfig. * Default is false. * * Recommended to use it for flexibility. */ - usePoliciesForTTL?: boolean; + usePoliciesForDelay?: boolean; } export interface RabbitMQManagementConfig { diff --git a/tests/Examples/RunMQProcessorConfigurationExample.ts b/tests/Examples/RunMQProcessorConfigurationExample.ts index 95c5121..86b9b47 100644 --- a/tests/Examples/RunMQProcessorConfigurationExample.ts +++ b/tests/Examples/RunMQProcessorConfigurationExample.ts @@ -8,7 +8,7 @@ export class RunMQProcessorConfigurationExample { attempts = faker.number.int({min: 0, max: 10}), attemptsDelay = faker.number.int({min: 10, max: 1000}), messageSchema = MessageSchemaExample.random(), - usePoliciesForTTL: boolean = false + usePoliciesForDelay: boolean = false ): RunMQProcessorConfiguration { return { name, @@ -16,14 +16,14 @@ export class RunMQProcessorConfigurationExample { attempts, attemptsDelay, messageSchema, - usePoliciesForTTL + usePoliciesForDelay } } static simpleNoSchema( name: string = faker.lorem.word(), consumersCount: number = 1, - usePoliciesForTTL: boolean = false + usePoliciesForDelay: boolean = false ): RunMQProcessorConfiguration { return this.random( name, @@ -31,7 +31,7 @@ export class RunMQProcessorConfigurationExample { 3, 100, undefined, - usePoliciesForTTL + usePoliciesForDelay ); } diff --git a/tests/e2e/RunMQ.processing.e2e.test.ts b/tests/e2e/RunMQ.processing.e2e.test.ts index 29ff4eb..91055ac 100644 --- a/tests/e2e/RunMQ.processing.e2e.test.ts +++ b/tests/e2e/RunMQ.processing.e2e.test.ts @@ -269,7 +269,7 @@ describe('RunMQ E2E Tests', () => { await testingConnection.disconnect(); }) - it('Should process with TTL policy disabled when usePoliciesForTTL is false', async () => { + it('Should process with TTL policy disabled when usePoliciesForDelay is false', async () => { const configuration = RunMQProcessorConfigurationExample.simpleNoSchema() const channel = await testingConnection.getChannel(); diff --git a/tests/unit/core/consumer/RunMQConsumerCreator.test.ts b/tests/unit/core/consumer/RunMQConsumerCreator.test.ts index a985eb4..5d8f6d5 100644 --- a/tests/unit/core/consumer/RunMQConsumerCreator.test.ts +++ b/tests/unit/core/consumer/RunMQConsumerCreator.test.ts @@ -47,7 +47,7 @@ describe('RunMQConsumerCreator Unit Tests', () => { }); describe('createConsumer', () => { - it('should create consumer with correct queue assertions when usePoliciesForTTL is false', async () => { + it('should create consumer with correct queue assertions when usePoliciesForDelay is false', async () => { await consumerCreator.createConsumer(testConsumerConfig); expect(mockedChannel.assertQueue).toHaveBeenCalledWith( @@ -80,7 +80,7 @@ describe('RunMQConsumerCreator Unit Tests', () => { expect(mockTTLPolicyManager.apply).not.toHaveBeenCalled(); }); - it('should use TTL policies when usePoliciesForTTL is true', async () => { + it('should use TTL policies when usePoliciesForDelay is true', async () => { await consumerCreator.createConsumer(testConsumerConfigWithPolicies);