Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/generators/channels.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ These are the available options for the `channels` generator;
| eventSourceDependency | `'@microsoft/fetch-event-source'` | String | Because @microsoft/fetch-event-source is out-dated in some areas we allow you to change the fork/variant that can be used instead |

## TypeScript

Regardless of protocol, these are the dependencies:
- If validation enabled, [ajv](https://ajv.js.org/guide/getting-started.html): ^8.17.1

Depending on which protocol, these are the dependencies:
- `NATS`: https://github.com/nats-io/nats.js v2
- `Kafka`: https://github.com/tulios/kafkajs v2
Expand All @@ -58,4 +60,4 @@ const { jetStreamPublishTo..., jetStreamPullSubscribeTo..., jetStreamPushSubscri

First we import the generated file, which is located based on your `outputPath` in the generator options.

Next we import the desired protocol and then we have access to all the support functions. These support functions are an easy way to interact with channels defined in your AsyncAPI document. Take notice it does not care which operations you have defined.
Next we import the desired protocol and then we have access to all the support functions. These support functions are an easy way to interact with channels defined in your AsyncAPI document. Take notice it does not care which operations you have defined.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@
"bump:version": "npm --no-git-tag-version --allow-same-version version $VERSION",
"runtime:prepare": "npm link",
"runtime:typescript": "npm run runtime:typescript:setup && npm run runtime:typescript:test",
"runtime:typescript:setup": "npm run runtime:prepare && npm run runtime:services:start && cd test/runtime/typescript && npm ci && npm run generate",
"runtime:typescript:setup": "npm run runtime:prepare && npm run runtime:services:start && npm run runtime:typescript:generate",
"runtime:typescript:generate": "cd test/runtime/typescript && npm ci && npm run generate",
"runtime:typescript:test": "cd test/runtime/typescript && npm run test",
"runtime:services:start": "npm run runtime:nats:start && npm run runtime:kafka:start && npm run runtime:mqtt:start && npm run runtime:amqp:start",
"runtime:services:stop": "npm run runtime:nats:stop && npm run runtime:kafka:stop && npm run runtime:mqtt:stop && npm run runtime:amqp:stop",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ export async function generateAmqpChannels(
channelParameters: parameter,
topic,
messageType: '',
subName: context.subName
subName: context.subName,
payloadGenerator: context.payloads
};

const operations = channel.operations().all();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ export async function generateEventSourceChannels(
channelParameters: parameter,
topic,
messageType: '',
subName: context.subName
subName: context.subName,
payloadGenerator: context.payloads
};

const operations = channel.operations().all();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ export async function generateKafkaChannels(
channelParameters: parameter,
topic: kafkaTopic,
messageType: '',
subName: context.subName
subName: context.subName,
payloadGenerator: context.payloads
};

const operations = channel.operations().all();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ export async function generateMqttChannels(
channelParameters: parameter,
topic,
subName: context.subName,
messageType: ''
messageType: '',
payloadGenerator: context.payloads
};
let renders = [];
const operations = channel.operations().all();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import {ChannelFunctionTypes, RenderRequestReplyParameters} from '../../types';
import {SingleFunctionRenderType} from '../../../../../types';
import {findRegexFromChannel, pascalCase} from '../../../utils';
import {getValidationFunctions} from '../../utils';

export function renderCoreReply({
requestTopic,
Expand All @@ -12,8 +13,10 @@ export function renderCoreReply({
replyMessageModule,
channelParameters,
subName = pascalCase(requestTopic),
payloadGenerator,
functionName = `replyTo${subName}`
}: RenderRequestReplyParameters): SingleFunctionRenderType {
const includeValidation = payloadGenerator.generator.includeValidation;
const addressToUse = channelParameters
? `parameters.getChannelWithParameters('${requestTopic}')`
: `'${requestTopic}'`;
Expand All @@ -22,6 +25,17 @@ export function renderCoreReply({
? `${requestMessageModule}.${requestMessageType}`
: requestMessageType;
const replyType = replyMessageModule ?? replyMessageType;

const {potentialValidatorCreation, potentialValidationFunction} =
getValidationFunctions({
includeValidation,
messageModule: requestMessageModule,
messageType: requestMessageType,
onValidationFail: channelParameters
? `onDataCallback(new Error('Invalid request payload received', {cause: errors}), undefined, parameters); continue;`
: `onDataCallback(new Error('Invalid request payload received', {cause: errors}), undefined); continue;`
});

const callbackFunctionParameters = [
{
parameter: 'err?: Error',
Expand Down Expand Up @@ -66,11 +80,16 @@ export function renderCoreReply({
{
parameter: 'options?: Nats.SubscriptionOptions',
jsDoc: ' * @param options when setting up the reply'
},
{
parameter: 'skipMessageValidation: boolean = false',
jsDoc:
' * @param skipMessageValidation turn off runtime validation of incoming messages'
}
];

//Determine the receiving process based on message payload type
const receivingOperation = `let receivedData : any = codec.decode(msg.data);
${potentialValidationFunction}
const replyMessage = await onDataCallback(undefined, ${requestMessageModule ?? requestMessageType}.unmarshal(receivedData) ${channelParameters ? ', parameters ?? undefined' : ''});`;

const replyOperation = `let dataToSend : any = replyMessage.marshal();
Expand Down Expand Up @@ -102,6 +121,7 @@ ${functionName}: (
return new Promise(async (resolve, reject) => {
try {
let subscription = nc.subscribe(${addressToUse}, options);
${potentialValidatorCreation}
(async () => {
for await (const msg of subscription) {
${channelParameters ? `const parameters = ${channelParameters.type}.createFromChannel(msg.subject, '${requestTopic}', ${findRegexFromChannel(requestTopic)})` : ''}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import {ChannelFunctionTypes, RenderRequestReplyParameters} from '../../types';
import {SingleFunctionRenderType} from '../../../../../types';
import {pascalCase} from '../../../utils';
import {getValidationFunctions} from '../../utils';

export function renderCoreRequest({
requestTopic,
Expand All @@ -12,21 +13,33 @@ export function renderCoreRequest({
replyMessageModule,
channelParameters,
subName = pascalCase(requestTopic),
payloadGenerator,
functionName = `requestTo${subName}`
}: RenderRequestReplyParameters): SingleFunctionRenderType {
const includeValidation = payloadGenerator.generator.includeValidation;
const addressToUse = channelParameters
? `parameters.getChannelWithParameters('${requestTopic}')`
: `'${requestTopic}'`;
const messageType = requestMessageModule

const requestType = requestMessageModule
? `${requestMessageModule}.${requestMessageType}`
: requestMessageType;
const replyType = replyMessageModule
? `${replyMessageModule}.${replyMessageType}`
: replyMessageType;

const {potentialValidatorCreation, potentialValidationFunction} =
getValidationFunctions({
includeValidation,
messageModule: replyMessageModule,
messageType: replyMessageType,
onValidationFail: `return reject(new Error('Invalid message payload received', {cause: errors}));`
});

const functionParameters = [
{
parameter: `requestMessage: ${messageType}`,
jsDoc: ' * @param requestMessage to make the request with'
parameter: `requestMessage: ${requestType}`,
jsDoc: ` * @param requestMessage the message to send`
},
...(channelParameters
? [
Expand All @@ -38,56 +51,54 @@ export function renderCoreRequest({
: []),
{
parameter: 'nc: Nats.NatsConnection',
jsDoc: ' * @param nc the NATS client to make the request through'
jsDoc: ' * @param nc the NATS client to send the request through'
},
{
parameter: 'codec: any = Nats.JSONCodec()',
jsDoc:
' * @param codec the serialization codec to use when sending the request and receiving the reply'
' * @param codec the serialization codec to use when sending and receiving the message'
},
{
parameter: 'options?: Nats.RequestOptions',
jsDoc: ' * @param options when making the request'
jsDoc: ' * @param options when sending the request'
},
{
parameter: 'skipMessageValidation: boolean = false',
jsDoc:
' * @param skipMessageValidation turn off runtime validation of outgoing messages'
}
];

//Determine the request operation based on whether the message type is null
let requestMessageMarshalling = 'requestMessage.marshal()';
if (requestMessageModule) {
requestMessageMarshalling = `${requestMessageModule}.marshal(requestMessage)`;
}
const requestOperation = `let dataToSend: any = codec.encode(${requestMessageMarshalling});
const msg = await nc.request(${addressToUse}, dataToSend, options)`;

//Determine the request callback operation based on message type
const requestCallbackOperation = `let receivedData = codec.decode(msg.data);
const unmarshalData = ${replyMessageModule ?? replyMessageType}.unmarshal(receivedData);
resolve(unmarshalData);`;

const jsDocParameters = functionParameters
.map((param) => param.jsDoc)
.join('\n');

const code = `/**
* Request to \`${requestTopic}\`
* Core request for \`${requestTopic}\`
*
${jsDocParameters}
*/
${functionName}: (
${functionParameters.map((param) => param.parameter).join(', ')}
${functionParameters.map((param) => param.parameter).join(', \n ')}
): Promise<${replyType}> => {
return new Promise(async (resolve, reject) => {
try {
${requestOperation}
${requestCallbackOperation}
${potentialValidatorCreation}
let dataToSend: any = requestMessage.marshal();
dataToSend = codec.encode(dataToSend);

const msg = await nc.request(${addressToUse}, dataToSend, options);
const receivedData: any = codec.decode(msg.data);
${potentialValidationFunction}
resolve(${replyMessageModule ? `${replyMessageModule}.unmarshal(receivedData)` : `${replyMessageType}.unmarshal(receivedData)`});
} catch (e: any) {
reject(e);
}
});
}`;

return {
messageType,
messageType: requestType,
replyType,
code,
functionName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,36 @@ import {ChannelFunctionTypes} from '../..';
import {SingleFunctionRenderType} from '../../../../../types';
import {findRegexFromChannel, pascalCase} from '../../../utils';
import {RenderRegularParameters} from '../../types';
import {getValidationFunctions} from '../../utils';

export function renderCoreSubscribe({
topic,
messageType,
messageModule,
channelParameters,
subName = pascalCase(topic),
payloadGenerator,
functionName = `subscribeTo${subName}`
}: RenderRegularParameters): SingleFunctionRenderType {
const includeValidation = payloadGenerator.generator.includeValidation;
const addressToUse = channelParameters
? `parameters.getChannelWithParameters('${topic}')`
: `'${topic}'`;
let messageUnmarshalling = `${messageType}.unmarshal(receivedData)`;
if (messageModule) {
messageUnmarshalling = `${messageModule}.unmarshal(receivedData)`;
}

const {potentialValidatorCreation, potentialValidationFunction} =
getValidationFunctions({
includeValidation,
messageModule,
messageType,
onValidationFail: channelParameters
? `onDataCallback(new Error('Invalid message payload received', {cause: errors}), undefined, parameters, msg); continue;`
: `onDataCallback(new Error('Invalid message payload received', {cause: errors}), undefined, msg); continue;`
});

messageType = messageModule ? `${messageModule}.${messageType}` : messageType;

const callbackFunctionParameters = [
Expand Down Expand Up @@ -70,19 +84,29 @@ export function renderCoreSubscribe({
{
parameter: 'options?: Nats.SubscriptionOptions',
jsDoc: ' * @param options when setting up the subscription'
},
{
parameter: 'skipMessageValidation: boolean = false',
jsDoc:
' * @param skipMessageValidation turn off runtime validation of incoming messages'
}
];

const whenReceivingMessage = channelParameters
? messageType === 'null'
? `onDataCallback(undefined, null, parameters, msg);`
: `let receivedData: any = codec.decode(msg.data);
onDataCallback(undefined, ${messageUnmarshalling}, parameters, msg);`
: messageType === 'null'
? `onDataCallback(undefined, null, msg);`
: `let receivedData: any = codec.decode(msg.data);
let whenReceivingMessage = '';
if (channelParameters) {
if (messageType === 'null') {
whenReceivingMessage = `onDataCallback(undefined, null, parameters, msg);`;
} else {
whenReceivingMessage = `let receivedData: any = codec.decode(msg.data);
${potentialValidationFunction}
onDataCallback(undefined, ${messageUnmarshalling}, parameters, msg);`;
}
} else if (messageType === 'null') {
whenReceivingMessage = `onDataCallback(undefined, null, msg);`;
} else {
whenReceivingMessage = `let receivedData: any = codec.decode(msg.data);
${potentialValidationFunction}
onDataCallback(undefined, ${messageUnmarshalling}, msg);`;

}
const jsDocParameters = functionParameters
.map((param) => param.jsDoc)
.join('\n');
Expand All @@ -94,21 +118,21 @@ onDataCallback(undefined, ${messageUnmarshalling}, msg);`;
* Callback for when receiving messages
*
* @callback ${functionName}Callback
${callbackJsDocParameters}
${callbackJsDocParameters}
*/

/**
* Core subscription for \`${topic}\`
*
${jsDocParameters}
${jsDocParameters}
*/
${functionName}: (
${functionParameters.map((param) => param.parameter).join(', ')}
${functionParameters.map((param) => param.parameter).join(', \n ')}
): Promise<Nats.Subscription> => {
return new Promise(async (resolve, reject) => {
try {
const subscription = nc.subscribe(${addressToUse}, options);

${potentialValidatorCreation}
(async () => {
for await (const msg of subscription) {
${channelParameters ? `const parameters = ${channelParameters.type}.createFromChannel(msg.subject, '${topic}', ${findRegexFromChannel(topic)})` : ''}
Expand Down
Loading