Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,62 @@ import {ChannelFunctionTypes} from '../..';
import {SingleFunctionRenderType} from '../../../../../types';
import {pascalCase} from '../../../utils';
import {RenderRegularParameters} from '../../types';
import { getValidationFunctions } from '../../utils';

export function renderSubscribeQueue({
topic,
messageType,
messageModule,
channelParameters,
subName = pascalCase(topic),
functionName = `subscribeTo${subName}Queue`
functionName = `subscribeTo${subName}Queue`,
payloadGenerator
}: RenderRegularParameters): SingleFunctionRenderType {
const includeValidation = payloadGenerator.generator.includeValidation;
const addressToUse = channelParameters
? `parameters.getChannelWithParameters('${topic}')`
: `'${topic}'`;
const messageUnmarshalling = `${messageModule ?? messageType}.unmarshal(msg.content.toString())`;
const messageUnmarshalling = `${messageModule ?? messageType}.unmarshal(receivedData)`;
messageType = messageModule ? `${messageModule}.${messageType}` : messageType;

const {potentialValidatorCreation, potentialValidationFunction} =
getValidationFunctions({
includeValidation,
messageModule,
messageType,
onValidationFail: `onDataCallback(new Error('Invalid message payload received', {cause: errors}), undefined, msg); return;`
});
const subscribeOperation = `const channel = await amqp.createChannel();
const queue = ${addressToUse};
await channel.assertQueue(queue, { durable: true });
${potentialValidatorCreation}
channel.consume(queue, (msg) => {
if (msg !== null) {
const receivedData = msg.content.toString()
${potentialValidationFunction}
const message = ${messageUnmarshalling};
onMessage({message, amqpMsg: msg});
onDataCallback(undefined, message, msg);
}
}, options);`;

const callbackFunctionParameters = [
{
parameter: 'err?: Error',
jsDoc: ' * @param err if any error occurred this will be sat'
},
{
parameter: `msg?: ${messageType}`,
jsDoc: ' * @param msg that was received'
},
{
parameter: `amqpMsg?: Amqp.ConsumeMessage`,
jsDoc: ' * @param amqpMsg'
}
];
const functionParameters = [
{
parameter: `onMessage: (callback: {message: ${messageType}, amqpMsg: Amqp.ConsumeMessage}) => void`,
jsDoc: ' * @param onMessage callback to handle received messages'
parameter: `onDataCallback: (${callbackFunctionParameters.map((param) => param.parameter).join(', ')}) => void`,
jsDoc: ` * @param {${functionName}Callback} onDataCallback to call when messages are received`
},
...(channelParameters
? [
Expand All @@ -46,6 +73,11 @@ channel.consume(queue, (msg) => {
},
{
parameter: `options?: Amqp.Options.Consume`
},
{
parameter: 'skipMessageValidation: boolean = false',
jsDoc:
' * @param skipMessageValidation turn off runtime validation of incoming messages'
}
];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,24 +240,29 @@ channel.sendToQueue(queue, Buffer.from(dataToSend), options);
/**
* AMQP subscribe operation for queue \`user/signedup\`
*
* @param onMessage callback to handle received messages
* @param {subscribeToUserSignedupQueueCallback} onDataCallback to call when messages are received
* @param amqp the AMQP connection to receive from

* @param skipMessageValidation turn off runtime validation of incoming messages
*/
subscribeToUserSignedupQueue: (
onMessage: (callback: {message: MessageTypeModule.MessageType, amqpMsg: Amqp.ConsumeMessage}) => void,
onDataCallback: (err?: Error, msg?: MessageTypeModule.MessageType, amqpMsg?: Amqp.ConsumeMessage) => void,
amqp: Amqp.Connection,
options?: Amqp.Options.Consume
options?: Amqp.Options.Consume,
skipMessageValidation: boolean = false
): Promise<Amqp.Channel> => {
return new Promise(async (resolve, reject) => {
try {
const channel = await amqp.createChannel();
const queue = 'user/signedup';
await channel.assertQueue(queue, { durable: true });

channel.consume(queue, (msg) => {
if (msg !== null) {
const message = MessageTypeModule.unmarshal(msg.content.toString());
onMessage({message, amqpMsg: msg});
const receivedData = msg.content.toString()

const message = MessageTypeModule.unmarshal(receivedData);
onDataCallback(undefined, message, msg);
}
}, options);
resolve(channel);
Expand Down Expand Up @@ -670,24 +675,29 @@ channel.sendToQueue(queue, Buffer.from(dataToSend), options);
/**
* AMQP subscribe operation for queue \`user/signedup\`
*
* @param onMessage callback to handle received messages
* @param {subscribeToUserSignedupQueueCallback} onDataCallback to call when messages are received
* @param amqp the AMQP connection to receive from

* @param skipMessageValidation turn off runtime validation of incoming messages
*/
subscribeToUserSignedupQueue: (
onMessage: (callback: {message: MessageTypeModule.MessageType, amqpMsg: Amqp.ConsumeMessage}) => void,
onDataCallback: (err?: Error, msg?: MessageTypeModule.MessageType, amqpMsg?: Amqp.ConsumeMessage) => void,
amqp: Amqp.Connection,
options?: Amqp.Options.Consume
options?: Amqp.Options.Consume,
skipMessageValidation: boolean = false
): Promise<Amqp.Channel> => {
return new Promise(async (resolve, reject) => {
try {
const channel = await amqp.createChannel();
const queue = 'user/signedup';
await channel.assertQueue(queue, { durable: true });

channel.consume(queue, (msg) => {
if (msg !== null) {
const message = MessageTypeModule.unmarshal(msg.content.toString());
onMessage({message, amqpMsg: msg});
const receivedData = msg.content.toString()

const message = MessageTypeModule.unmarshal(receivedData);
onDataCallback(undefined, message, msg);
}
}, options);
resolve(channel);
Expand Down Expand Up @@ -1133,24 +1143,29 @@ channel.sendToQueue(queue, Buffer.from(dataToSend), options);
/**
* AMQP subscribe operation for queue \`/ping\`
*
* @param onMessage callback to handle received messages
* @param {subscribeToPingQueueCallback} onDataCallback to call when messages are received
* @param amqp the AMQP connection to receive from

* @param skipMessageValidation turn off runtime validation of incoming messages
*/
subscribeToPingQueue: (
onMessage: (callback: {message: MessageTypeModule.MessageType, amqpMsg: Amqp.ConsumeMessage}) => void,
onDataCallback: (err?: Error, msg?: MessageTypeModule.MessageType, amqpMsg?: Amqp.ConsumeMessage) => void,
amqp: Amqp.Connection,
options?: Amqp.Options.Consume
options?: Amqp.Options.Consume,
skipMessageValidation: boolean = false
): Promise<Amqp.Channel> => {
return new Promise(async (resolve, reject) => {
try {
const channel = await amqp.createChannel();
const queue = '/ping';
await channel.assertQueue(queue, { durable: true });

channel.consume(queue, (msg) => {
if (msg !== null) {
const message = MessageTypeModule.unmarshal(msg.content.toString());
onMessage({message, amqpMsg: msg});
const receivedData = msg.content.toString()

const message = MessageTypeModule.unmarshal(receivedData);
onDataCallback(undefined, message, msg);
}
}, options);
resolve(channel);
Expand Down
32 changes: 23 additions & 9 deletions test/runtime/typescript/test/channels/amqp.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { UserSignedupParameters } from '../../src/parameters/UserSignedupParamet

describe('amqp', () => {
const testMessage = new UserSignedUp({displayName: 'test', email: 'test@test.dk'});
const invalidMessage = new UserSignedUp({displayName: 'test', email: '123'});
const testParameters = new UserSignedupParameters({myParameter: 'test', enumParameter: 'asyncapi'});
let connection;
beforeAll(async () => {
Expand All @@ -22,14 +23,10 @@ describe('amqp', () => {
it('should be able to publish to queue', () => {
// eslint-disable-next-line no-async-promise-executor
return new Promise<void>(async (resolve, reject) => {
const channel = await subscribeToReceiveUserSignedupQueue(({message, amqpMsg}) => {
if (message !== null) {
expect(message.marshal()).toEqual(testMessage.marshal());
channel.ack(amqpMsg);
resolve();
} else {
reject();
}
const channel = await subscribeToReceiveUserSignedupQueue((err, message, amqpMsg) => {
expect(message.marshal()).toEqual(testMessage.marshal());
channel.ack(amqpMsg);
resolve();
}, testParameters, connection, {noAck: true});
channel.on('error', (err) => {
reject(err);
Expand All @@ -39,6 +36,23 @@ describe('amqp', () => {
await publishToSendUserSignedupQueue(testMessage, testParameters, connection);
});
});
it('should be able to catch invalid message', () => {
// eslint-disable-next-line no-async-promise-executor
return new Promise<void>(async (resolve, reject) => {
const channel = await subscribeToReceiveUserSignedupQueue((err, message, amqpMsg) => {
expect(err).toBeDefined();
expect(err?.message).toEqual('Invalid message payload received');
expect(err?.cause).toBeDefined();
resolve()
}, testParameters, connection, {noAck: true});
channel.on('error', (err) => {
reject(err);
});
await channel.prefetch(1);

await publishToSendUserSignedupQueue(invalidMessage, testParameters, connection);
});
});
// TODO: cannot create exchange
// it('should be able to publish to exchange', () => {
// // eslint-disable-next-line no-async-promise-executor
Expand Down Expand Up @@ -70,7 +84,7 @@ describe('amqp', () => {
describe('without parameters', () => {
it('should be able to publish to queue', () => {
return new Promise<void>(async (resolve, reject) => {
const channel = await subscribeToNoParameterQueue(({message, amqpMsg}) => {
const channel = await subscribeToNoParameterQueue((err, message, amqpMsg) => {
if (message !== null) {
expect(message.marshal()).toEqual(testMessage.marshal());
channel.ack(amqpMsg);
Expand Down
2 changes: 1 addition & 1 deletion test/runtime/typescript/test/channels/nats.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ describe('nats', () => {
await subscriber.drain();
resolve();
} catch (error) {
reject(error); config
reject(error);
}
}, new UserSignedupParameters({ myParameter: '*', enumParameter: 'asyncapi' }), js, config);
js.publish(`user.signedup.${testParameters.myParameter}.${testParameters.enumParameter}`, incorrectPayload);
Expand Down