Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,42 @@ const { EventBridgeClient, PutEventsCommand } = require('@aws-sdk/client-eventbr
const eventBridgeMock = mockClient(EventBridgeClient);
const sqsMock = mockClient(SQSClient);

const validCloudEvent = {
id: "123e4567-e89b-12d3-a456-426614174000",
source: "mock",
specversion: "1.0",
type: "data",
subject: "123e4567-e89b-12d3-a456-426614174001",
time: "2024-01-01T00:00:00Z",
datacontenttype: "application/json",
dataschema: "https://notify.nhs.uk/events/schemas/supplier-status/v1.json",
dataschemaversion: "1.0",
data: {
nhsNumber: "1234567890",
delayedFallback: false,
sendingGroupId: "group-1",
clientId: "client-1",
campaignId: "campaign-1",
supplierStatus: "active",
previousSupplierStatus: "inactive"
}
};

const invalidCloudEvent = {
// missing required fields
type: "data",
data: {}
};

const snsEvent = {
Records: [
{ Sns: { Message: JSON.stringify({ type: 'data', version: 1, source: 'mock', message: 'test' }) } }
{ Sns: { Message: JSON.stringify(validCloudEvent) } }
]
};

const snsEventInvalid = {
Records: [
{ Sns: { Message: JSON.stringify(invalidCloudEvent) } }
]
};

Expand All @@ -29,7 +62,7 @@ describe('SNS to EventBridge Lambda', () => {
test('Invalid event is sent to DLQ', async () => {
sqsMock.on(SendMessageCommand).resolves({ MessageId: '123' });

await handler(snsEvent);
await handler(snsEventInvalid);

expect(sqsMock.calls()).toHaveLength(1);
});
Expand All @@ -46,19 +79,16 @@ describe('SNS to EventBridge Lambda', () => {

expect(eventBridgeMock.calls()).toHaveLength(2);
expect(sqsMock.calls()).toHaveLength(1);
});
});

test('Throttling delays event processing', async () => {
process.env.THROTTLE_DELAY_MS = '500';
jest.useFakeTimers();

const startTime = Date.now();
const handlerPromise = handler(snsEvent);
jest.advanceTimersByTime(500);
await handlerPromise;
const endTime = Date.now();

expect(endTime - startTime).toBeGreaterThanOrEqual(500);
jest.useRealTimers();
});
});
});
230 changes: 127 additions & 103 deletions infrastructure/modules/eventpub/lambda/eventpub/src/index.js
Original file line number Diff line number Diff line change
@@ -1,103 +1,127 @@
const { EventBridgeClient, PutEventsCommand } = require('@aws-sdk/client-eventbridge');
const { SQSClient, SendMessageCommand } = require('@aws-sdk/client-sqs');

const eventBridge = new EventBridgeClient({});
const sqs = new SQSClient({});

const DATA_PLANE_EVENT_BUS_ARN = process.env.DATA_PLANE_EVENT_BUS_ARN;
const CONTROL_PLANE_EVENT_BUS_ARN = process.env.CONTROL_PLANE_EVENT_BUS_ARN;
const DLQ_URL = process.env.DLQ_URL;
const THROTTLE_DELAY_MS = parseInt(process.env.THROTTLE_DELAY_MS || '0', 10);
const MAX_RETRIES = 3;
const EVENTBRIDGE_MAX_BATCH_SIZE = 10;

function validateEvent(event) {
// Test Event
// {
// "type":"data",
// "version":"0.1",
// "source":"manual",
// "detailtype":"testEvent",
// "message":"Hello World"
// }
const requiredFields = ['type', 'version', 'source', 'message'];
return requiredFields.every(field => event.hasOwnProperty(field));
}

async function sendToEventBridge(events, eventBusArn) {
// console.info(`Sending ${events.length} events to EventBridge: ${eventBusArn}`);

const failedEvents = [];
for (let i = 0; i < events.length; i += EVENTBRIDGE_MAX_BATCH_SIZE) {
const batch = events.slice(i, i + EVENTBRIDGE_MAX_BATCH_SIZE);
const entries = batch.map(event => ({
Source: 'custom.event',
DetailType: event.type,
Detail: JSON.stringify(event),
EventBusName: eventBusArn
}));

let attempts = 0;
while (attempts < MAX_RETRIES) {
try {
// console.info(`Attempt ${attempts + 1}: Sending batch of ${entries.length} events.`);

const response = await eventBridge.send(new PutEventsCommand({ Entries: entries }));
response.FailedEntryCount && response.Entries.forEach((entry, idx) => {
if (entry.ErrorCode) {
console.warn(`Event failed with error: ${entry.ErrorCode}`);
failedEvents.push(batch[idx]);
}
});
break;
} catch (error) {
console.error(`EventBridge send error: ${error}`);

if (error.retryable) {
console.warn(`Retrying after backoff: attempt ${attempts + 1}`);
await new Promise(res => setTimeout(res, 2 ** attempts * 100));
attempts++;
} else {
failedEvents.push(...batch);
break;
}
}
}
}
return failedEvents;
}

async function sendToDLQ(events) {
console.warn(`Sending ${events.length} failed events to DLQ`);

for (const event of events) {
await sqs.send(new SendMessageCommand({ QueueUrl: DLQ_URL, MessageBody: JSON.stringify(event) }));
}
}

exports.handler = async (snsEvent) => {
// console.info(`Received SNS event with ${snsEvent.Records.length} records.`);

if (THROTTLE_DELAY_MS > 0) {
console.info(`Throttling enabled. Delaying processing by ${THROTTLE_DELAY_MS}ms`);
await new Promise(res => setTimeout(res, THROTTLE_DELAY_MS));
}

const records = snsEvent.Records.map(record => JSON.parse(record.Sns.Message));
const validEvents = records.filter(validateEvent);
const invalidEvents = records.filter(event => !validateEvent(event));

// console.info(`Valid events: ${validEvents.length}, Invalid events: ${invalidEvents.length}`);

if (invalidEvents.length) await sendToDLQ(invalidEvents);

const dataEvents = validEvents.filter(event => event.type === 'data');
const controlEvents = validEvents.filter(event => event.type === 'control');

// console.info(`Data events: ${dataEvents.length}, Control events: ${controlEvents.length}`);

const failedDataEvents = await sendToEventBridge(dataEvents, DATA_PLANE_EVENT_BUS_ARN);
const failedControlEvents = await sendToEventBridge(controlEvents, CONTROL_PLANE_EVENT_BUS_ARN);

await sendToDLQ([...failedDataEvents, ...failedControlEvents]);
};
const { EventBridgeClient, PutEventsCommand } = require('@aws-sdk/client-eventbridge');
const { SQSClient, SendMessageCommand } = require('@aws-sdk/client-sqs');

const eventBridge = new EventBridgeClient({});
const sqs = new SQSClient({});

const DATA_PLANE_EVENT_BUS_ARN = process.env.DATA_PLANE_EVENT_BUS_ARN;
const CONTROL_PLANE_EVENT_BUS_ARN = process.env.CONTROL_PLANE_EVENT_BUS_ARN;
const DLQ_URL = process.env.DLQ_URL;
const THROTTLE_DELAY_MS = parseInt(process.env.THROTTLE_DELAY_MS || '0', 10);
const MAX_RETRIES = 3;
const EVENTBRIDGE_MAX_BATCH_SIZE = 10;

function validateEvent(event) {
// CloudEvents v1.0 schema validation (supplier-status)
const requiredFields = [
'id',
'source',
'specversion',
'type',
'subject',
'time',
'datacontenttype',
'dataschema',
'dataschemaversion',
'data'
];
// Check top-level required fields
if (!requiredFields.every(field => event.hasOwnProperty(field))) {
return false;
}
// Check nested data object and its required fields
const dataRequiredFields = [
'nhsNumber',
'delayedFallback',
'sendingGroupId',
'clientId',
'campaignId',
'supplierStatus',
'previousSupplierStatus'
];
if (
typeof event.data !== 'object' ||
!dataRequiredFields.every(field => event.data.hasOwnProperty(field))
) {
return false;
}
return true;
}

async function sendToEventBridge(events, eventBusArn) {
// console.info(`Sending ${events.length} events to EventBridge: ${eventBusArn}`);

const failedEvents = [];
for (let i = 0; i < events.length; i += EVENTBRIDGE_MAX_BATCH_SIZE) {
const batch = events.slice(i, i + EVENTBRIDGE_MAX_BATCH_SIZE);
const entries = batch.map(event => ({
Source: 'custom.event',
DetailType: event.type,
Detail: JSON.stringify(event),
EventBusName: eventBusArn
}));

let attempts = 0;
while (attempts < MAX_RETRIES) {
try {
// console.info(`Attempt ${attempts + 1}: Sending batch of ${entries.length} events.`);

const response = await eventBridge.send(new PutEventsCommand({ Entries: entries }));
response.FailedEntryCount && response.Entries.forEach((entry, idx) => {
if (entry.ErrorCode) {
console.warn(`Event failed with error: ${entry.ErrorCode}`);
failedEvents.push(batch[idx]);
}
});
break;
} catch (error) {
console.error(`EventBridge send error: ${error}`);

if (error.retryable) {
console.warn(`Retrying after backoff: attempt ${attempts + 1}`);
await new Promise(res => setTimeout(res, 2 ** attempts * 100));
attempts++;
} else {
failedEvents.push(...batch);
break;
}
}
}
}
return failedEvents;
}

async function sendToDLQ(events) {
console.warn(`Sending ${events.length} failed events to DLQ`);

for (const event of events) {
await sqs.send(new SendMessageCommand({ QueueUrl: DLQ_URL, MessageBody: JSON.stringify(event) }));
}
}

exports.handler = async (snsEvent) => {
// console.info(`Received SNS event with ${snsEvent.Records.length} records.`);

if (THROTTLE_DELAY_MS > 0) {
console.info(`Throttling enabled. Delaying processing by ${THROTTLE_DELAY_MS}ms`);
await new Promise(res => setTimeout(res, THROTTLE_DELAY_MS));
}

const records = snsEvent.Records.map(record => JSON.parse(record.Sns.Message));
const validEvents = records.filter(validateEvent);
const invalidEvents = records.filter(event => !validateEvent(event));

// console.info(`Valid events: ${validEvents.length}, Invalid events: ${invalidEvents.length}`);

if (invalidEvents.length) await sendToDLQ(invalidEvents);

const dataEvents = validEvents.filter(event => event.type === 'data');
const controlEvents = validEvents.filter(event => event.type === 'control');

// console.info(`Data events: ${dataEvents.length}, Control events: ${controlEvents.length}`);

const failedDataEvents = await sendToEventBridge(dataEvents, DATA_PLANE_EVENT_BUS_ARN);
const failedControlEvents = await sendToEventBridge(controlEvents, CONTROL_PLANE_EVENT_BUS_ARN);

await sendToDLQ([...failedDataEvents, ...failedControlEvents]);
};
Loading