Skip to content

Commit 387db9d

Browse files
committed
outbox pattern
1 parent 789f99d commit 387db9d

10 files changed

Lines changed: 296 additions & 1 deletion

File tree

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import { Prisma } from "generated/prisma/client";
2+
3+
export interface OutboxEvent {
4+
id: string;
5+
topic: string;
6+
payload: string;
7+
status: OutboxStatus;
8+
error: string | null;
9+
attempts: number;
10+
processedAt: Date | null;
11+
createdAt: Date;
12+
updatedAt: Date | null;
13+
}
14+
15+
export enum OutboxStatus {
16+
PENDING = 'PENDING',
17+
PROCESSING = 'PROCESSING',
18+
PROCESSED = 'PROCESSED',
19+
FAILED = 'FAILED'
20+
}
21+
22+
export type CreateOutboxEventData = Omit<OutboxEvent, 'id' | 'createdAt' | 'updatedAt' | 'processedAt' | 'attempts' | 'error'>;
23+
export type UpdateOutboxEventData = Partial<Omit<OutboxEvent, 'id' | 'createdAt' | 'updatedAt'>>;
24+
25+
export interface OutboxRepository {
26+
findManyBy<T extends Prisma.OutboxEventWhereInput>(query: T): Promise<OutboxEvent[] | null>;
27+
create(user: CreateOutboxEventData): Promise<OutboxEvent>;
28+
update(id: string, user: UpdateOutboxEventData): Promise<OutboxEvent>;
29+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import { Injectable, Inject } from '@nestjs/common';
2+
import { Cron } from '@nestjs/schedule';
3+
import { ClientProxy } from '@nestjs/microservices';
4+
import { lastValueFrom } from 'rxjs';
5+
import { PrismaOutboxRepository } from './outbox.service';
6+
import { OutboxStatus } from '../domain/outbox.interface';
7+
8+
@Injectable()
9+
export class OutboxProcessor {
10+
private isProcessing = false;
11+
12+
constructor(
13+
private readonly outbox: PrismaOutboxRepository,
14+
@Inject('EMAIL_SERVICE') private client: ClientProxy,
15+
) {}
16+
17+
@Cron('*/5 * * * * *')
18+
async handle() {
19+
if (this.isProcessing) return // Prevent overlapping if the previous one hasn't finished
20+
21+
this.isProcessing = true
22+
23+
try {
24+
const messages = await this.outbox.findManyBy({ status: OutboxStatus.PENDING });
25+
for (const message of messages) {
26+
try {
27+
const {topic, payload } = message
28+
await lastValueFrom(this.client.emit(topic, payload))
29+
await this.outbox.update(message.id, { status: OutboxStatus.PROCESSED, processedAt: new Date() })
30+
} catch (error) {
31+
await this.outbox.update(message.id, {
32+
error: error.message,
33+
attempts: message.attempts + 1,
34+
status: message.attempts >= 3 ? OutboxStatus.FAILED : OutboxStatus.PENDING
35+
})
36+
}
37+
}
38+
} finally {
39+
this.isProcessing = false
40+
}
41+
}
42+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import { Injectable } from '@nestjs/common';
2+
import { PrismaService } from '../../prisma/prisma.service';
3+
import { CreateOutboxEventData, OutboxRepository, UpdateOutboxEventData} from '../domain/outbox.interface';
4+
import { Prisma } from 'generated/prisma/client';
5+
6+
@Injectable()
7+
export class PrismaOutboxRepository implements OutboxRepository {
8+
9+
constructor(
10+
private prisma: PrismaService,
11+
) {}
12+
13+
async create(data: CreateOutboxEventData) {
14+
return this.prisma.outboxEvent.create({
15+
data
16+
})
17+
}
18+
19+
async update(id: string, data: UpdateOutboxEventData) {
20+
return this.prisma.outboxEvent.update({
21+
where: { id },
22+
data
23+
})
24+
}
25+
26+
async findManyBy<T extends Prisma.OutboxEventWhereInput>(query: T) {
27+
return this.prisma.outboxEvent.findMany({
28+
where: query,
29+
take: 20,
30+
orderBy: { createdAt: 'asc' }
31+
})
32+
}
33+
34+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import { Module } from '@nestjs/common';
2+
import { PrismaOutboxRepository } from './infrastructure/outbox.service';
3+
import { PrismaService } from '../prisma/prisma.service';
4+
import { ClientsModule, Transport } from '@nestjs/microservices';
5+
6+
@Module({
7+
imports: [
8+
ClientsModule.register([
9+
{
10+
name: 'EMAIL_SERVICE',
11+
transport: Transport.RMQ,
12+
options: {
13+
urls: [process.env.RABBITMQ_URL || 'amqp://localhost:5672'],
14+
queue: 'email_queue',
15+
queueOptions: {
16+
durable: true,
17+
},
18+
},
19+
},
20+
]),
21+
],
22+
providers: [PrismaOutboxRepository, PrismaService],
23+
exports: [PrismaOutboxRepository],
24+
})
25+
export class OutboxModule {}

apps/superchef/src/prisma/prisma.service.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ export class PrismaService implements OnModuleInit {
8888
return this.prisma.refreshToken;
8989
}
9090

91+
get outboxEvent() {
92+
return this.prisma.outboxEvent;
93+
}
94+
9195
async onModuleInit() {
9296
await this.prisma.$connect();
9397
}

compose.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
services:
2+
23
postgres:
34
image: postgres:15-alpine
45
container_name: superchef_db

package-lock.json

Lines changed: 120 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
"@nestjs/microservices": "^11.0.0",
3838
"@nestjs/platform-express": "^11.0.1",
3939
"@nestjs/platform-socket.io": "^11.1.12",
40+
"@nestjs/schedule": "^6.1.0",
4041
"@nestjs/swagger": "^11.2.5",
4142
"@nestjs/terminus": "^11.0.0",
4243
"@nestjs/throttler": "^6.3.0",
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
-- CreateEnum
2+
CREATE TYPE "OutboxStatus" AS ENUM ('PENDING', 'PROCESSING', 'PROCESSED', 'FAILED');
3+
4+
-- CreateTable
5+
CREATE TABLE "outbox_event" (
6+
"id" TEXT NOT NULL,
7+
"topic" TEXT NOT NULL,
8+
"payload" JSONB NOT NULL,
9+
"status" "OutboxStatus" NOT NULL DEFAULT 'PENDING',
10+
"error" TEXT,
11+
"attempts" INTEGER NOT NULL DEFAULT 0,
12+
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
13+
"updated_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
14+
15+
CONSTRAINT "outbox_event_pkey" PRIMARY KEY ("id")
16+
);
17+
18+
-- CreateIndex
19+
CREATE INDEX "outbox_event_status_created_at_idx" ON "outbox_event"("status", "created_at");

prisma/schema.prisma

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,3 +131,24 @@ model RefreshToken {
131131
@@unique([userId, deviceId])
132132
@@map("refresh_token")
133133
}
134+
135+
enum OutboxStatus {
136+
PENDING
137+
PROCESSING
138+
PROCESSED
139+
FAILED
140+
}
141+
142+
model OutboxEvent {
143+
id String @id @default(uuid())
144+
topic String
145+
payload Json
146+
status OutboxStatus @default(PENDING)
147+
error String?
148+
attempts Int @default(0)
149+
createdAt DateTime @default(now()) @map("created_at")
150+
updatedAt DateTime @default(now()) @map("updated_at")
151+
152+
@@index([status, createdAt])
153+
@@map("outbox_event")
154+
}

0 commit comments

Comments
 (0)