Skip to content

Commit 62f4625

Browse files
committed
outbox pattern
1 parent 789f99d commit 62f4625

12 files changed

Lines changed: 351 additions & 36 deletions

File tree

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import { Prisma } from "generated/prisma/client";
2+
3+
export interface OutboxEvent {
4+
id: string;
5+
topic: string;
6+
payload: string;
7+
status: OutboxStatus;
8+
error: string | null;
9+
attempts: number;
10+
processedAt: Date | null;
11+
createdAt: Date;
12+
updatedAt: Date | null;
13+
}
14+
15+
export enum OutboxStatus {
16+
PENDING = 'PENDING',
17+
PROCESSING = 'PROCESSING',
18+
PROCESSED = 'PROCESSED',
19+
FAILED = 'FAILED'
20+
}
21+
22+
export type CreateOutboxEventData = Omit<OutboxEvent, 'id' | 'createdAt' | 'updatedAt' | 'processedAt' | 'attempts' | 'error'>;
23+
export type UpdateOutboxEventData = Partial<Omit<OutboxEvent, 'id' | 'createdAt' | 'updatedAt'>>;
24+
25+
export interface OutboxRepository {
26+
findManyBy<T extends Prisma.OutboxEventWhereInput>(query: T): Promise<OutboxEvent[] | null>;
27+
create(user: CreateOutboxEventData): Promise<OutboxEvent>;
28+
update(id: string, user: UpdateOutboxEventData): Promise<OutboxEvent>;
29+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import { Injectable, Inject } from '@nestjs/common';
2+
import { Cron } from '@nestjs/schedule';
3+
import { ClientProxy } from '@nestjs/microservices';
4+
import { lastValueFrom } from 'rxjs';
5+
import { PrismaOutboxRepository } from './outbox.service';
6+
import { OutboxStatus } from '../domain/outbox.interface';
7+
8+
@Injectable()
9+
export class OutboxProcessor {
10+
private isProcessing = false;
11+
12+
constructor(
13+
private readonly outbox: PrismaOutboxRepository,
14+
@Inject('EMAIL_SERVICE') private client: ClientProxy,
15+
) {}
16+
17+
@Cron('*/5 * * * * *')
18+
async handle() {
19+
if (this.isProcessing) return // Prevent overlapping if the previous one hasn't finished
20+
21+
this.isProcessing = true
22+
23+
try {
24+
const messages = await this.outbox.findManyBy({ status: OutboxStatus.PENDING });
25+
for (const message of messages) {
26+
try {
27+
const {topic, payload } = message
28+
await lastValueFrom(this.client.emit(topic, payload))
29+
await this.outbox.update(message.id, { status: OutboxStatus.PROCESSED, processedAt: new Date() })
30+
} catch (error) {
31+
await this.outbox.update(message.id, {
32+
error: error.message,
33+
attempts: message.attempts + 1,
34+
status: message.attempts >= 3 ? OutboxStatus.FAILED : OutboxStatus.PENDING
35+
})
36+
}
37+
}
38+
} finally {
39+
this.isProcessing = false
40+
}
41+
}
42+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import { Injectable } from '@nestjs/common';
2+
import { PrismaService } from '../../prisma/prisma.service';
3+
import { CreateOutboxEventData, OutboxRepository, UpdateOutboxEventData} from '../domain/outbox.interface';
4+
import { Prisma } from 'generated/prisma/client';
5+
6+
@Injectable()
7+
export class PrismaOutboxRepository implements OutboxRepository {
8+
9+
constructor(
10+
private prisma: PrismaService,
11+
) {}
12+
13+
async create(data: CreateOutboxEventData) {
14+
return this.prisma.outboxEvent.create({
15+
data
16+
})
17+
}
18+
19+
async update(id: string, data: UpdateOutboxEventData) {
20+
return this.prisma.outboxEvent.update({
21+
where: { id },
22+
data
23+
})
24+
}
25+
26+
async findManyBy<T extends Prisma.OutboxEventWhereInput>(query: T) {
27+
return this.prisma.outboxEvent.findMany({
28+
where: query,
29+
take: 20,
30+
orderBy: { createdAt: 'asc' }
31+
})
32+
}
33+
34+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import { Module } from '@nestjs/common';
2+
import { PrismaOutboxRepository } from './infrastructure/outbox.service';
3+
import { PrismaService } from '../prisma/prisma.service';
4+
import { ClientsModule, Transport } from '@nestjs/microservices';
5+
6+
@Module({
7+
imports: [
8+
ClientsModule.register([
9+
{
10+
name: 'EMAIL_SERVICE',
11+
transport: Transport.RMQ,
12+
options: {
13+
urls: [process.env.RABBITMQ_URL || 'amqp://localhost:5672'],
14+
queue: 'email_queue',
15+
queueOptions: {
16+
durable: true,
17+
},
18+
},
19+
},
20+
]),
21+
],
22+
providers: [PrismaOutboxRepository, PrismaService],
23+
exports: [PrismaOutboxRepository],
24+
})
25+
export class OutboxModule {}

apps/superchef/src/prisma/prisma.service.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ export class PrismaService implements OnModuleInit {
8888
return this.prisma.refreshToken;
8989
}
9090

91+
get outboxEvent() {
92+
return this.prisma.outboxEvent;
93+
}
94+
9195
async onModuleInit() {
9296
await this.prisma.$connect();
9397
}

apps/superchef/src/users/application/create-user.usecase.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,17 @@ export class CreateUserUsecase {
3535
user = await this.user.create(payload);
3636
this.logger.log(`User created: ${user.email}`);
3737

38-
const emailPayload = {
38+
/* const emailPayload = {
3939
name: data.name,
4040
to: data.email,
4141
subject: 'Welcome to superchef!',
4242
body: `
4343
Thank you for registering at superchef.
4444
We are excited to have you on board!
4545
`,
46-
};
46+
}; */
4747

48-
this.client.emit('user_registered', emailPayload);
48+
// this.client.emit('user_registered', emailPayload);
4949

5050
return user;
5151
} catch (error) {

apps/superchef/src/users/infrastructure/prisma-user.repository.ts

Lines changed: 52 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { UpdateUserData } from '../domain/user.interface';
1010
import { CreateUserData } from '../domain/user.interface';
1111
import { PlanNotFoundException } from '@/common/exceptions/plan-not-found.exception';
1212
import { UserNotFoundException } from '@/common/exceptions/user-not-found.exception';
13-
import { Prisma } from 'generated/prisma/edge';
13+
import { Prisma, OutboxStatus } from 'generated/prisma/edge';
1414

1515
@Injectable()
1616
export class UserRepositoryImpl implements UserRepository {
@@ -99,38 +99,58 @@ export class UserRepositoryImpl implements UserRepository {
9999
throw new PlanNotFoundException();
100100
}
101101

102-
const user = await this.prisma.user.create({
103-
data: {
104-
name,
105-
email,
106-
username,
107-
password,
108-
...(preferences && { preferences }),
109-
roles: {
110-
create:
111-
roles?.map((role) => ({
112-
role: {
113-
connect: { name: role },
114-
},
115-
})) || [],
116-
},
117-
subscription: {
118-
create: subscription
119-
? {
120-
status: 'active',
121-
stripeSubscriptionId: '',
122-
currentPeriodEnd: null,
123-
plan: {
124-
connect: {
125-
id: plan!.id,
126-
name: subscription,
102+
const user = await this.prisma.$transaction(async (prisma) => {
103+
const u = await this.prisma.user.create({
104+
data: {
105+
name,
106+
email,
107+
username,
108+
password,
109+
...(preferences && { preferences }),
110+
roles: {
111+
create:
112+
roles?.map((role) => ({
113+
role: {
114+
connect: { name: role },
127115
},
128-
},
129-
}
130-
: undefined,
131-
},
132-
},
133-
});
116+
})) || [],
117+
},
118+
subscription: {
119+
create: subscription
120+
? {
121+
status: 'active',
122+
stripeSubscriptionId: '',
123+
currentPeriodEnd: null,
124+
plan: {
125+
connect: {
126+
id: plan!.id,
127+
name: subscription,
128+
},
129+
},
130+
}
131+
: undefined,
132+
},
133+
},
134+
});
135+
136+
await prisma.outboxEvent.create({
137+
data: {
138+
topic: 'user_registered',
139+
payload: JSON.stringify({
140+
name: u.name,
141+
to: u.email,
142+
subject: 'Welcome to superchef!',
143+
body: `
144+
Thank you for registering at superchef.
145+
We are excited to have you on board!
146+
`,
147+
}),
148+
status: OutboxStatus.PENDING,
149+
},
150+
});
151+
152+
return u;
153+
})
134154

135155
return user;
136156
}

compose.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
services:
2+
23
postgres:
34
image: postgres:15-alpine
45
container_name: superchef_db

0 commit comments

Comments
 (0)