Skip to content

Commit bbe1c35

Browse files
committed
feat: supporting policies for TTL instead of queue-level TTL, with default false
Signed-off-by: Fawzi Essam <iifawzie@gmail.com>
1 parent 21bb8b8 commit bbe1c35

26 files changed

Lines changed: 1261 additions & 24 deletions

src/core/RunMQ.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import {RunMQProcessorConfiguration, RunMQConnectionConfig, RunMQPublisher, RunM
22
import {RunMQException} from "@src/core/exceptions/RunMQException";
33
import {AmqplibClient} from "@src/core/clients/AmqplibClient";
44
import {Exceptions} from "@src/core/exceptions/Exceptions";
5-
import {RunMQUtils} from "@src/core/utils/Utils";
5+
import {RunMQUtils} from "@src/core/utils/RunMQUtils";
66
import {Constants, DEFAULTS} from "@src/core/constants";
77
import {Channel} from "amqplib";
88
import {RunMQConsumerCreator} from "@src/core/consumer/RunMQConsumerCreator";
@@ -51,7 +51,7 @@ export class RunMQ {
5151
* @param processor The function that will process the incoming messages
5252
*/
5353
public async process<T = Record<string, never>>(topic: string, config: RunMQProcessorConfiguration, processor: (message: RunMQMessageContent<T>) => Promise<void>) {
54-
const consumer = new RunMQConsumerCreator(this.defaultChannel!, this.amqplibClient, this.logger);
54+
const consumer = new RunMQConsumerCreator(this.defaultChannel!, this.amqplibClient, this.logger, this.config.management);
5555
await consumer.createConsumer<T>(new ConsumerConfiguration(topic, config, processor))
5656
}
5757

src/core/constants/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ export const Constants = {
44
DEAD_LETTER_ROUTER_EXCHANGE_NAME: RUNMQ_PREFIX + "dead_letter_router",
55
RETRY_DELAY_QUEUE_PREFIX: RUNMQ_PREFIX + "retry_delay_",
66
DLQ_QUEUE_PREFIX: RUNMQ_PREFIX + "dlq_",
7+
MESSAGE_TTL_OPERATOR_POLICY_PREFIX: RUNMQ_PREFIX + "message_ttl_operator_policy",
78
}
89

910
export const DEFAULTS = {

src/core/consumer/ConsumerCreatorUtils.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,8 @@ export class ConsumerCreatorUtils {
77
static getRetryDelayTopicName(topic: string): string {
88
return Constants.RETRY_DELAY_QUEUE_PREFIX + topic;
99
}
10+
11+
static getMessageTTLPolicyName(topic: string): string {
12+
return Constants.MESSAGE_TTL_OPERATOR_POLICY_PREFIX + topic;
13+
}
1014
}

src/core/consumer/RunMQConsumerCreator.ts

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,25 @@ import {RunMQLogger} from "@src/core/logging/RunMQLogger";
1414
import {DefaultDeserializer} from "@src/core/serializers/deserializer/DefaultDeserializer";
1515
import {ConsumerCreatorUtils} from "@src/core/consumer/ConsumerCreatorUtils";
1616
import {RunMQPublisherCreator} from "@src/core/publisher/RunMQPublisherCreator";
17-
import {AMQPClient} from "@src/types";
17+
import {AMQPClient, RabbitMQManagementConfig} from "@src/types";
18+
import {RunMQTTLPolicyManager} from "@src/core/management/Policies/RunMQTTLPolicyManager";
19+
import {RunMQException} from "@src/core/exceptions/RunMQException";
20+
import {Exceptions} from "@src/core/exceptions/Exceptions";
1821

1922
export class RunMQConsumerCreator {
23+
private ttlPolicyManager: RunMQTTLPolicyManager;
24+
2025
constructor(
2126
private defaultChannel: Channel,
2227
private client: AMQPClient,
2328
private logger: RunMQLogger,
29+
managementConfig?: RabbitMQManagementConfig
2430
) {
31+
this.ttlPolicyManager = new RunMQTTLPolicyManager(logger, managementConfig);
2532
}
2633

27-
2834
public async createConsumer<T>(consumerConfiguration: ConsumerConfiguration<T>) {
35+
await this.ttlPolicyManager.initialize();
2936
await this.assertQueues<T>(consumerConfiguration);
3037
await this.bindQueues<T>(consumerConfiguration);
3138
for (let i = 0; i < consumerConfiguration.processorConfig.consumersCount; i++) {
@@ -78,16 +85,43 @@ export class RunMQConsumerCreator {
7885
deadLetterExchange: Constants.DEAD_LETTER_ROUTER_EXCHANGE_NAME,
7986
deadLetterRoutingKey: consumerConfiguration.processorConfig.name
8087
});
81-
await this.defaultChannel.assertQueue(ConsumerCreatorUtils.getRetryDelayTopicName(consumerConfiguration.processorConfig.name), {
82-
durable: true,
83-
deadLetterExchange: Constants.ROUTER_EXCHANGE_NAME,
84-
messageTtl: consumerConfiguration.processorConfig.attemptsDelay ?? DEFAULTS.PROCESSING_RETRY_DELAY,
85-
});
8688
await this.defaultChannel.assertQueue(ConsumerCreatorUtils.getDLQTopicName(consumerConfiguration.processorConfig.name), {
8789
durable: true,
8890
deadLetterExchange: Constants.ROUTER_EXCHANGE_NAME,
8991
deadLetterRoutingKey: consumerConfiguration.processorConfig.name
9092
});
93+
94+
const retryDelayQueueName = ConsumerCreatorUtils.getRetryDelayTopicName(consumerConfiguration.processorConfig.name);
95+
const messageDelay = consumerConfiguration.processorConfig.attemptsDelay ?? DEFAULTS.PROCESSING_RETRY_DELAY
96+
97+
98+
const policiesForTTL = consumerConfiguration.processorConfig.usePoliciesForTTL ?? false;
99+
if (!policiesForTTL) {
100+
await this.defaultChannel.assertQueue(retryDelayQueueName, {
101+
durable: true,
102+
deadLetterExchange: Constants.ROUTER_EXCHANGE_NAME,
103+
messageTtl: messageDelay,
104+
});
105+
return;
106+
}
107+
108+
const result = await this.ttlPolicyManager.apply(
109+
retryDelayQueueName,
110+
messageDelay
111+
);
112+
if (result) {
113+
await this.defaultChannel.assertQueue(retryDelayQueueName, {
114+
durable: true,
115+
deadLetterExchange: Constants.ROUTER_EXCHANGE_NAME
116+
});
117+
return;
118+
}
119+
throw new RunMQException(
120+
Exceptions.FAILURE_TO_DEFINE_TTL_POLICY,
121+
{
122+
error: "Failed to apply TTL policy to queue: " + retryDelayQueueName
123+
}
124+
);
91125
}
92126

93127

src/core/exceptions/Exceptions.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ export class Exceptions {
44
public static NOT_INITIALIZED = 'NOT_INITIALIZED';
55
public static INVALID_MESSAGE_FORMAT = 'MESSAGE_SHOULD_BE_VALID_RECORD';
66
public static UNSUPPORTED_SCHEMA = 'UNSUPPORTED_SCHEMA';
7+
public static FAILURE_TO_DEFINE_TTL_POLICY = 'FAILURE_TO_DEFINE_TTL_POLICY';
78
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import {RabbitMQOperatorPolicy} from "@src/types";
2+
import {RunMQUtils} from "@src/core/utils/RunMQUtils";
3+
import {ConsumerCreatorUtils} from "@src/core/consumer/ConsumerCreatorUtils";
4+
5+
export class RabbitMQMessageTTLPolicy {
6+
static createFor(queueName: string, ttl: number): RabbitMQOperatorPolicy {
7+
return {
8+
name: ConsumerCreatorUtils.getMessageTTLPolicyName(queueName),
9+
pattern: RunMQUtils.escapeRegExp(queueName),
10+
definition: {
11+
"message-ttl": ttl
12+
},
13+
"apply-to": "queues",
14+
priority: 1000
15+
}
16+
}
17+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import {RabbitMQManagementClient} from "@src/core/management/RabbitMQManagementClient";
2+
import {RunMQLogger} from "@src/core/logging/RunMQLogger";
3+
import {DEFAULTS} from "@src/core/constants";
4+
import {RabbitMQManagementConfig} from "@src";
5+
import {RabbitMQMessageTTLPolicy} from "@src/core/management/Policies/RabbitMQMessageTTLPolicy";
6+
7+
export class RunMQTTLPolicyManager {
8+
private readonly managementClient: RabbitMQManagementClient | null = null;
9+
private isManagementPluginEnabled = false;
10+
11+
constructor(
12+
private logger: RunMQLogger,
13+
private managementConfig?: RabbitMQManagementConfig
14+
) {
15+
if (this.managementConfig) {
16+
this.managementClient = new RabbitMQManagementClient(this.managementConfig, this.logger);
17+
}
18+
}
19+
20+
public async initialize(): Promise<void> {
21+
if (!this.managementClient) {
22+
this.logger.warn("Management client not configured");
23+
return;
24+
}
25+
26+
this.isManagementPluginEnabled = await this.managementClient.checkManagementPluginEnabled();
27+
28+
if (!this.isManagementPluginEnabled) {
29+
this.logger.warn("RabbitMQ management plugin is not enabled");
30+
} else {
31+
this.logger.info("RabbitMQ management plugin is enabled");
32+
}
33+
}
34+
35+
public async apply(
36+
queueName: string,
37+
ttl?: number,
38+
vhost: string = "%2F"
39+
): Promise<boolean> {
40+
const actualTTL = ttl ?? DEFAULTS.PROCESSING_RETRY_DELAY;
41+
42+
if (this.isManagementPluginEnabled && this.managementClient) {
43+
const success = await this.managementClient.createOrUpdateOperatorPolicy(
44+
vhost,
45+
RabbitMQMessageTTLPolicy.createFor(queueName, actualTTL)
46+
);
47+
48+
if (success) {
49+
return true
50+
}
51+
}
52+
return false;
53+
}
54+
55+
56+
public async cleanup(queueName: string, vhost: string = "%2F"): Promise<void> {
57+
if (this.isManagementPluginEnabled && this.managementClient) {
58+
const policyName = `ttl-policy-${queueName}`;
59+
await this.managementClient.deleteOperatorPolicy(vhost, policyName);
60+
}
61+
}
62+
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
import {RunMQLogger} from "@src/core/logging/RunMQLogger";
2+
import {RabbitMQManagementConfig} from "@src";
3+
import {RabbitMQOperatorPolicy} from "@src/types";
4+
5+
export class RabbitMQManagementClient {
6+
constructor(
7+
private config: RabbitMQManagementConfig,
8+
private logger: RunMQLogger
9+
) {}
10+
11+
private getAuthHeader(): string {
12+
const credentials = Buffer.from(`${this.config.username}:${this.config.password}`).toString('base64');
13+
return `Basic ${credentials}`;
14+
}
15+
16+
public async createOrUpdateOperatorPolicy(vhost: string, policy: RabbitMQOperatorPolicy): Promise<boolean> {
17+
try {
18+
const url = `${this.config.url}/api/operator-policies/${vhost}/${encodeURIComponent(policy.name)}`;
19+
20+
const response = await fetch(url, {
21+
method: 'PUT',
22+
headers: {
23+
'Content-Type': 'application/json',
24+
'Authorization': this.getAuthHeader()
25+
},
26+
body: JSON.stringify({
27+
pattern: policy.pattern,
28+
definition: policy.definition,
29+
priority: policy.priority || 0,
30+
"apply-to": policy["apply-to"]
31+
})
32+
});
33+
34+
if (!response.ok) {
35+
const error = await response.text();
36+
this.logger.error(`Failed to create operator policy: ${response.status} - ${error}`);
37+
return false;
38+
}
39+
40+
this.logger.info(`Successfully set operator policy: ${policy.name}`);
41+
return true;
42+
} catch (error) {
43+
this.logger.error(`Error creating operator policy: ${error}`);
44+
return false;
45+
}
46+
}
47+
48+
public async getOperatorPolicy(vhost: string, policyName: string): Promise<RabbitMQOperatorPolicy | null> {
49+
try {
50+
const url = `${this.config.url}/api/operator-policies/${vhost}/${encodeURIComponent(policyName)}`;
51+
52+
const response = await fetch(url, {
53+
method: 'GET',
54+
headers: {
55+
'Authorization': this.getAuthHeader()
56+
}
57+
});
58+
59+
if (!response.ok) {
60+
if (response.status === 404) {
61+
return null;
62+
}
63+
const error = await response.text();
64+
this.logger.error(`Failed to get operator policy: ${response.status} - ${error}`);
65+
return null;
66+
}
67+
68+
return await response.json();
69+
} catch (error) {
70+
this.logger.error(`Error getting operator policy: ${error}`);
71+
return null;
72+
}
73+
}
74+
75+
public async deleteOperatorPolicy(vhost: string, policyName: string): Promise<boolean> {
76+
try {
77+
const url = `${this.config.url}/api/operator-policies/${vhost}/${encodeURIComponent(policyName)}`;
78+
79+
const response = await fetch(url, {
80+
method: 'DELETE',
81+
headers: {
82+
'Authorization': this.getAuthHeader()
83+
}
84+
});
85+
86+
if (!response.ok && response.status !== 404) {
87+
const error = await response.text();
88+
this.logger.error(`Failed to delete operator policy: ${response.status} - ${error}`);
89+
return false;
90+
}
91+
92+
this.logger.info(`Successfully deleted operator policy: ${policyName}`);
93+
return true;
94+
} catch (error) {
95+
this.logger.error(`Error deleting operator policy: ${error}`);
96+
return false;
97+
}
98+
}
99+
100+
public async checkManagementPluginEnabled(): Promise<boolean> {
101+
try {
102+
const url = `${this.config.url}/api/overview`;
103+
104+
const response = await fetch(url, {
105+
method: 'GET',
106+
headers: {
107+
'Authorization': this.getAuthHeader()
108+
}
109+
});
110+
111+
return response.ok;
112+
} catch (error) {
113+
this.logger.warn(`Management plugin not accessible: ${error}`);
114+
return false;
115+
}
116+
}
117+
}

src/core/message/RabbitMQMessage.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import {Channel} from "amqplib";
2-
import {RunMQUtils} from "@src/core/utils/Utils";
2+
import {RunMQUtils} from "@src/core/utils/RunMQUtils";
33
import {RabbitMQMessageProperties} from "@src/core/message/RabbitMQMessageProperties";
44
import {AMQPMessage} from "@src/core/message/AmqpMessage";
55

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,8 @@ export class RunMQUtils {
1616
throw new RunMQException(Exceptions.INVALID_MESSAGE_FORMAT, {});
1717
}
1818
}
19+
20+
public static escapeRegExp(string: string): string {
21+
return string.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
22+
}
1923
}

0 commit comments

Comments
 (0)