Skip to content

Commit 6f50486

Browse files
committed
outbox pattern
1 parent 789f99d commit 6f50486

17 files changed

Lines changed: 444 additions & 54 deletions

File tree

README.md

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@
1414
6. [Caching (Redis)](#caching-redis)
1515
7. [User Preferences](#user-preferences)
1616
8. [AI Recipe Assistant](#ai-recipe-assistant)
17-
9. [Project setup](#project-setup)
18-
10. [Environment variables](#environment-variables)
19-
11. [Compile and run the project](#compile-and-run-the-project)
20-
12. [Run tests](#run-tests)
21-
13. [Deployment](#deployment)
22-
14. [License](#license)
17+
9. [Analytics](#bar_chart-analytics)
18+
10.[Database Transactional Outbox Pattern](#database-transactional-outbox-pattern)
19+
11. [Project setup](#project-setup)
20+
12. [Environment variables](#environment-variables)
21+
13. [Compile and run the project](#compile-and-run-the-project)
22+
14. [Run tests](#run-tests)
23+
15. [Deployment](#deployment)
24+
16. [License](#license)
2325

2426

2527
## SuperChef
@@ -265,6 +267,52 @@ Sample response:
265267
]
266268
```
267269

270+
## Database Transactional Outbox Pattern
271+
272+
When a user is created in the system two things happen:
273+
274+
1. A new record is created in the users table respectively.
275+
2. A welcome email is dispatched to the user's inbox.
276+
277+
This alone, could be a problem and lead to data inconsistencies, for example, it could be the case that the user is created successfully in our database but the message broker for whatever reason goes down precisely at that moment and the message is not delivered.
278+
279+
How do we mitigate this ?
280+
281+
By adding a transactional outbox pattern, this is how it works:
282+
283+
1. There's a table holding the outbox events:
284+
285+
```typescript
286+
model OutboxEvent {
287+
id String @id @default(uuid())
288+
topic String
289+
payload Json
290+
status OutboxStatus @default(PENDING)
291+
error String?
292+
attempts Int @default(0)
293+
createdAt DateTime @default(now()) @map("created_at")
294+
updatedAt DateTime @default(now()) @map("updated_at")
295+
296+
@@index([status, createdAt])
297+
@@map("outbox_event")
298+
}
299+
```
300+
301+
Creating the user is done by means of a transaction, this transaction involves updating both the user table and the outbox_event table.
302+
303+
2. There's a cron job checking the outbox_event table every `5` seconds.
304+
305+
- If there's a `PENDING` event it will try to send it to the message broker so it can be processed.
306+
- If the processing fails and the third attempt hasn't been reached the status remains `PENDING` and the attempts count bumps up.
307+
- If it's the third attempt and the processing fails the status is updated to `FAILED`.
308+
309+
You can check the entire flow in the image below.
310+
311+
<p align="center">
312+
<img src="./outbox.png" alt="superchef" />
313+
</p>
314+
315+
268316
## :credit_card: Stripe Integration
269317
270318
Superchef integrates with Stripe for subscription management, allowing users to subscribe to a basic plan and access enhanced features.

apps/rabbitmq/src/controllers/rabbitmq.controller.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { Controller, Logger, UseFilters } from '@nestjs/common';
22
import { EventPattern, Ctx, RmqContext } from '@nestjs/microservices';
33
import type { ChannelModel, ConsumeMessage } from 'amqplib';
4-
import { UserRegisteredPayload } from '../domain/email.interface';
4+
import { Email, UserRegisteredPayload } from '../domain/email.interface';
55
import { SendMailUsecase } from '../application/send-mail.usecase';
66
import { RmqErrorFilter } from '../infrastructure/filters/rmq.error.filter';
77

@@ -18,15 +18,17 @@ export class RabbitmqController {
1818
const message = context.getMessage() as ConsumeMessage;
1919

2020
let payload: UserRegisteredPayload;
21+
let data : Email
2122
try {
2223
payload = JSON.parse(message.content.toString()) as UserRegisteredPayload;
24+
data = JSON.parse(String(payload.data)) as Email
2325
} catch (err) {
2426
this.logger.error('Failed to parse message content', err as Error);
2527
channel.nack(message);
2628
return;
2729
}
2830

29-
await this.email.send(payload.data);
31+
await this.email.send(data);
3032

3133
channel.ack(message);
3234
this.logger.log('Email sent successfully');

apps/superchef/src/app.module.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@ import { CacheModule } from '@nestjs/cache-manager';
1717
import { KafkaModule } from './kafka.module';
1818
import { AnalyticsModule } from './analytics/analytics.module';
1919
import { HealthModule } from './health/health.module';
20+
import { ScheduleModule } from '@nestjs/schedule';
21+
import { OutboxModule } from './outbox/outbox.module';
2022

2123
@Module({
2224
imports: [
25+
ScheduleModule.forRoot(),
2326
KafkaModule,
2427
CacheModule.register({ isGlobal: true }),
2528
StripeModule.forRootAsync(),
@@ -44,6 +47,7 @@ import { HealthModule } from './health/health.module';
4447
CheckoutModule,
4548
AnalyticsModule,
4649
HealthModule,
50+
OutboxModule,
4751
],
4852
controllers: [AppController],
4953
providers: [
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import { Prisma } from "generated/prisma/client";
2+
3+
export interface OutboxEvent {
4+
id: string;
5+
topic: string;
6+
payload: string;
7+
status: OutboxStatus;
8+
error: string | null;
9+
attempts: number;
10+
createdAt: Date;
11+
updatedAt: Date | null;
12+
}
13+
14+
export enum OutboxStatus {
15+
PENDING = 'PENDING',
16+
PROCESSING = 'PROCESSING',
17+
PROCESSED = 'PROCESSED',
18+
FAILED = 'FAILED'
19+
}
20+
21+
export type CreateOutboxEventData = Omit<OutboxEvent, 'id' | 'createdAt' | 'updatedAt' | 'attempts' | 'error'>;
22+
export type UpdateOutboxEventData = Partial<Omit<OutboxEvent, 'id' | 'createdAt' | 'updatedAt'>>;
23+
24+
export interface OutboxRepository {
25+
findManyBy<T extends Prisma.OutboxEventWhereInput>(query: T): Promise<OutboxEvent[] | null>;
26+
create(user: CreateOutboxEventData): Promise<OutboxEvent>;
27+
update(id: string, user: UpdateOutboxEventData): Promise<OutboxEvent>;
28+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import { Injectable, Inject, Logger, OnModuleInit } from '@nestjs/common';
2+
import { Cron, CronExpression } from '@nestjs/schedule';
3+
import { ClientProxy } from '@nestjs/microservices';
4+
import { firstValueFrom, timeout, catchError, of } from 'rxjs';
5+
import { PrismaOutboxRepository } from './outbox.service';
6+
import { OutboxStatus } from '../domain/outbox.interface';
7+
8+
@Injectable()
9+
export class OutboxProcessor implements OnModuleInit {
10+
private isProcessing = false;
11+
private readonly logger = new Logger(OutboxProcessor.name);
12+
13+
constructor(
14+
private readonly outbox: PrismaOutboxRepository,
15+
@Inject('EMAIL_SERVICE') private client: ClientProxy,
16+
) {}
17+
18+
async onModuleInit() {
19+
await this.client.connect();
20+
}
21+
22+
@Cron(CronExpression.EVERY_30_SECONDS)
23+
async handle() {
24+
if (this.isProcessing) return // Prevent overlapping if the previous one hasn't finished
25+
26+
this.isProcessing = true
27+
28+
try {
29+
this.logger.debug('Processing outbox events...');
30+
const messages = await this.outbox.findManyBy({ status: OutboxStatus.PENDING });
31+
if (!messages?.length) {
32+
this.logger.debug('No pending outbox events found.');
33+
return;
34+
}
35+
for (const message of messages) {
36+
try {
37+
const {topic, payload } = message
38+
39+
// Use firstValueFrom to wait for the emit to complete
40+
// emit() completes when the message is sent to the broker
41+
await firstValueFrom(
42+
this.client.emit(topic, payload).pipe(
43+
timeout(5000),
44+
catchError((err) => {
45+
this.logger.error(`Emit failed for ${message.id}:`, err);
46+
throw err;
47+
})
48+
)
49+
);
50+
51+
this.logger.log(`Successfully emitted event ${message.id} to topic: ${topic}`);
52+
await this.outbox.update(message.id, { status: OutboxStatus.PROCESSED })
53+
} catch (error) {
54+
const errorMessage = error?.message || String(error) || 'Unknown error';
55+
this.logger.error(`Failed to process outbox event ${message.id}:`, JSON.stringify(errorMessage));
56+
await this.outbox.update(message.id, {
57+
error: errorMessage,
58+
attempts: message.attempts + 1,
59+
status: message.attempts >= 3 ? OutboxStatus.FAILED : OutboxStatus.PENDING
60+
})
61+
}
62+
}
63+
} finally {
64+
this.isProcessing = false
65+
}
66+
}
67+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import { Injectable } from '@nestjs/common';
2+
import { PrismaService } from '../../prisma/prisma.service';
3+
import { CreateOutboxEventData, OutboxEvent, OutboxRepository, UpdateOutboxEventData} from '../domain/outbox.interface';
4+
import { Prisma } from 'generated/prisma/client';
5+
6+
@Injectable()
7+
export class PrismaOutboxRepository implements OutboxRepository {
8+
9+
constructor(
10+
private prisma: PrismaService,
11+
) {}
12+
13+
async create(data: CreateOutboxEventData) : Promise<OutboxEvent> {
14+
return this.prisma.outboxEvent.create({
15+
data
16+
})
17+
}
18+
19+
async update(id: string, data: UpdateOutboxEventData) : Promise<OutboxEvent> {
20+
return this.prisma.outboxEvent.update({
21+
where: { id },
22+
data
23+
})
24+
}
25+
26+
async findManyBy<T extends Prisma.OutboxEventWhereInput>(query: T) : Promise<OutboxEvent[] | null> {
27+
return this.prisma.outboxEvent.findMany({
28+
where: query,
29+
take: 20,
30+
orderBy: { createdAt: 'asc' }
31+
})
32+
}
33+
34+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import { Module } from '@nestjs/common';
2+
import { PrismaOutboxRepository } from './infrastructure/outbox.service';
3+
import { PrismaService } from '../prisma/prisma.service';
4+
import { ClientsModule, Transport } from '@nestjs/microservices';
5+
import { OutboxProcessor } from './infrastructure/outbox.processor';
6+
7+
@Module({
8+
imports: [
9+
ClientsModule.register([
10+
{
11+
name: 'EMAIL_SERVICE',
12+
transport: Transport.RMQ,
13+
options: {
14+
urls: [process.env.RABBITMQ_URL || 'amqp://user:password@localhost:5672'],
15+
queue: 'rabbitmq',
16+
queueOptions: {
17+
durable: false,
18+
arguments: {
19+
'prefetch-count': 1,
20+
'x-message-ttl': 60000,
21+
'x-max-priority': 10,
22+
'x-dead-letter-exchange': 'dead_letter_exchange',
23+
'x-dead-letter-routing-key': 'dead_letter',
24+
},
25+
},
26+
noAck: true,
27+
},
28+
},
29+
]),
30+
],
31+
providers: [PrismaOutboxRepository, PrismaService, OutboxProcessor],
32+
exports: [PrismaOutboxRepository],
33+
})
34+
export class OutboxModule {}

apps/superchef/src/prisma/prisma.service.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ export class PrismaService implements OnModuleInit {
8888
return this.prisma.refreshToken;
8989
}
9090

91+
get outboxEvent() {
92+
return this.prisma.outboxEvent;
93+
}
94+
9195
async onModuleInit() {
9296
await this.prisma.$connect();
9397
}

apps/superchef/src/users/application/create-user.usecase.ts

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,18 +35,6 @@ export class CreateUserUsecase {
3535
user = await this.user.create(payload);
3636
this.logger.log(`User created: ${user.email}`);
3737

38-
const emailPayload = {
39-
name: data.name,
40-
to: data.email,
41-
subject: 'Welcome to superchef!',
42-
body: `
43-
Thank you for registering at superchef.
44-
We are excited to have you on board!
45-
`,
46-
};
47-
48-
this.client.emit('user_registered', emailPayload);
49-
5038
return user;
5139
} catch (error) {
5240
this.logger.error(`Error creating user: ${data.email}`, error.stack);

0 commit comments

Comments
 (0)