diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..8c406aa --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,18 @@ +{ + "version": "0.2.0", + "configurations": [ + { + "address": "127.0.0.1", + "localRoot": "${workspaceFolder}", + "name": "Debug", + "port": 9229, + "remoteRoot": "/app", + "request": "attach", + "restart": true, + "skipFiles": [ + "/**" + ], + "type": "pwa-node" + } + ] +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json index 3b8d430..ba483e7 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -18,6 +18,6 @@ } ], "editor.codeActionsOnSave": { - "source.fixAll.eslint": true + "source.fixAll.eslint": "explicit" } } \ No newline at end of file diff --git a/docker-compose.override.yml b/docker-compose.override.yml new file mode 100644 index 0000000..6354704 --- /dev/null +++ b/docker-compose.override.yml @@ -0,0 +1,17 @@ +version: "3" + +services: + rabbitmq: + image: rabbitmq:3-management + environment: + RABBITMQ_DEFAULT_VHOST: test + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:15672"] + interval: 30s + timeout: 10s + retries: 5 + ports: + - "15672:15672" + - "5672:5672" + volumes: + - ./docker/rabbitmq/enabled_plugins:/etc/rabbitmq/enabled_plugins diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..a649919 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,14 @@ +version: '3' +services: + worker: &base + image: rabbit-base-image + build: + context: . + dockerfile: docker/dev/Dockerfile + volumes: + - ./:/app + # env_file: .env + ports: + - 9229:9229 + depends_on: + - rabbitmq diff --git a/docker/dev/Dockerfile b/docker/dev/Dockerfile new file mode 100644 index 0000000..f446223 --- /dev/null +++ b/docker/dev/Dockerfile @@ -0,0 +1,5 @@ +FROM node:20-alpine + +WORKDIR /app + +CMD yarn test \ No newline at end of file diff --git a/docker/rabbitmq/enabled_plugins b/docker/rabbitmq/enabled_plugins new file mode 100644 index 0000000..6a211a0 --- /dev/null +++ b/docker/rabbitmq/enabled_plugins @@ -0,0 +1 @@ +[rabbitmq_management,rabbitmq_prometheus,rabbitmq_shovel,rabbitmq_shovel_management]. diff --git a/package.json b/package.json index 094114f..308b6a0 100644 --- a/package.json +++ b/package.json @@ -10,11 +10,13 @@ "format": "prettier-eslint src/**/*.ts --write", "lint": "eslint --ext .ts src", "build": "tsc", - "prepare": "tsc" + "prepare": "tsc", + "test": "tsnd --inspect=0.0.0.0:9229 --respawn --no-notify --poll --no-deps --ignore-watch node_modules --transpile-only src/test.ts" + }, "devDependencies": { "@eduzz/eslint-config-houston": "^1.0.14", - "@types/amqplib": "^0.10.1", + "@types/amqplib": "^0.10.4", "@types/node": "^18.15.11", "@types/winston": "^2.4.4", "@typescript-eslint/eslint-plugin": "^5.58.0", diff --git a/src/Queue.ts b/src/Queue.ts index 948b95d..7199b7c 100644 --- a/src/Queue.ts +++ b/src/Queue.ts @@ -29,7 +29,7 @@ export class Queue { } public async listen(callback: (data: T, message?: amqp.ConsumeMessage) => Promise) { - if (this.options.retryTimeout > 0 && this.options.deadLetterAfter <= 0) { + if (this.hasTimeout() && !this.hasDeadLetter()) { throw new Error('If you use retryTimeout, you need to specify a deadLetterAfter'); } @@ -197,7 +197,7 @@ export class Queue { } private async configureDLQQueue(ch: amqp.Channel) { - if (this.options.deadLetterAfter < 1) { + if (!this.hasDeadLetter()) { return; } @@ -219,7 +219,7 @@ export class Queue { let args: Record = {}; - if (this.options.retryTimeout > 0) { + if (this.hasTimeout()) { args = { 'x-dead-letter-exchange': exchange, 'x-dead-letter-routing-key': this.options.names.retryTopic, @@ -255,13 +255,13 @@ export class Queue { await ch.bindQueue(this.options.names.queueName, exchange, topic); } - if (this.options.enableNack && this.options.retryTimeout) { + if (this.options.enableNack && this.hasTimeout()) { await ch.bindQueue(this.options.names.queueName, exchange, this.options.names.retryTopic); } } private async handleFailedMessage(channel: amqp.Channel, msg: amqp.ConsumeMessage) { - if (!msg.properties?.headers['x-death']) { + if (!this.hasDeadLetter() || !msg.properties?.headers['x-death']) { channel.nack(msg, false, false); return; } @@ -276,4 +276,12 @@ export class Queue { channel.nack(msg, false, false); } + + private hasTimeout() { + return this.options.retryTimeout > 0; + } + + private hasDeadLetter() { + return this.options.deadLetterAfter > 0; + } } diff --git a/src/test.ts b/src/test.ts index e1d50ac..38ce43f 100644 --- a/src/test.ts +++ b/src/test.ts @@ -1,39 +1,32 @@ import { Connection } from './Connection'; -import { sleep } from './fn'; -(async () => { - const connection = new Connection({ - dsn: 'amqps://doehrbmi:1sfqvtXmdi8MCz0xOJ80r-6utLBjfj24@moose.rmq.cloudamqp.com/doehrbmi', - exchange: 'xpto', - connectionName: 'yay', - logLevel: 'debug', +const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); + +const connection = new Connection({ + dsn: 'amqp://guest:guest@rabbitmq/test', + exchange: 'test', + connectionName: 'test', +}); + +let count = 0; +connection + .queue('events') + .topic('event.sent') + .durable(true) + .prefetch(100) + .retryTimeout(1) + .deadLetterAfter(5000) + .listen(async (message) => { + count++; + console.log('count:', count); + throw new Error('error'); }); - try { - await connection.connect(); - - await connection - .queue('adasdasdqewwq') - .topic('xpto') - .listen(async (payload) => { - console.log('RECEIVED', payload); - await sleep(1000); - - return true; - }); - - await connection.delayQueue('myNiceDelayQueue').timeout(30000).from('xpto.from').to('xpto').create(); - - const publisher = connection.topic('xpto').persistent(); - - let id = 0; - - setInterval(async () => { - await publisher.send({ - payload: { x: ++id }, - }); - }, 1000); - } catch (err) { - console.log(err); - } +(async () => { + const publisher = connection.topic('event.sent').persistent(); + console.log('sending message'); + await sleep(3000); + await publisher.send({ payload: 'message' }); })(); + +console.log('started');