Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions db/schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@

CREATE TABLE payments (
id UUID PRIMARY KEY,
amount NUMERIC NOT NULL,
currency VARCHAR(10) NOT NULL,
status VARCHAR(20) NOT NULL,
created_at TIMESTAMP DEFAULT now()
);
CREATE TABLE outbox_events (
id UUID PRIMARY KEY,
aggregate_id UUID NOT NULL,
type VARCHAR(50) NOT NULL,
payload JSONB NOT NULL,
published BOOLEAN DEFAULT false,
created_at TIMESTAMP DEFAULT now()
);
CREATE TABLE payment_ack (
payment_id UUID NOT NULL,
consumer VARCHAR(50) NOT NULL,
status VARCHAR(20) NOT NULL,
PRIMARY KEY (payment_id, consumer)
);
CREATE TABLE processed_events (
event_id UUID NOT NULL,
consumer VARCHAR(50) NOT NULL,
processed_at TIMESTAMP DEFAULT now(),
PRIMARY KEY (event_id, consumer)
);
127 changes: 127 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
version: "3.9"

services:
postgres:
image: postgres:16
container_name: payment-platform-postgres
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: payments
ports:
- "5432:5432"
volumes:
- ./db/schema.sql:/docker-entrypoint-initdb.d/schema.sql
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 5s
timeout: 5s
retries: 5

zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
container_name: payment-platform-zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- "2181:2181"

kafka:
image: confluentinc/cp-kafka:7.6.0
container_name: payment-platform-kafka
depends_on:
zookeeper:
condition: service_started
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
healthcheck:
test:
["CMD", "kafka-topics", "--bootstrap-server", "localhost:9092", "--list"]
interval: 10s
timeout: 10s
retries: 10

# 🔹 Init job (best-effort): crea el tópico si no existe
kafka-init:
image: confluentinc/cp-kafka:7.6.0
container_name: payment-platform-kafka-init
depends_on:
kafka:
condition: service_healthy
entrypoint: ["/bin/sh", "-ec"]
command: >
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic payment.created.v1 --partitions 3 --replication-factor 1 &&
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic payment.settled.v1 --partitions 3 --replication-factor 1 &&
kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic payment.failed.v1 --partitions 3 --replication-factor 1

payment-api:
build: ./payment-api
container_name: payment-platform-payment-api
environment:
DATABASE_URL: postgres://postgres:postgres@postgres:5432/payments
ports:
- "3000:3000"
depends_on:
postgres:
condition: service_healthy
kafka:
condition: service_healthy

outbox-relay:
build: ./outbox-relay
container_name: payment-platform-outbox-relay
environment:
DATABASE_URL: postgres://postgres:postgres@postgres:5432/payments
depends_on:
postgres:
condition: service_healthy
kafka:
condition: service_healthy

fraud-consumer:
build: ./fraud-consumer
container_name: payment-platform-fraud-consumer
environment:
DATABASE_URL: postgres://postgres:postgres@postgres:5432/payments
CONSUMER_NAME: fraud
depends_on:
postgres:
condition: service_healthy
kafka:
condition: service_healthy

ledger-consumer:
build: ./ledger-consumer
container_name: payment-platform-ledger-consumer
environment:
DATABASE_URL: postgres://postgres:postgres@postgres:5432/payments
CONSUMER_NAME: ledger
depends_on:
postgres:
condition: service_healthy
kafka:
condition: service_healthy

notify-consumer:
build: ./notify-consumer
container_name: payment-platform-notify-consumer
depends_on:
kafka:
condition: service_healthy

status-worker:
build: ./status-worker
container_name: payment-platform-status-worker
environment:
DATABASE_URL: postgres://postgres:postgres@postgres:5432/payments
depends_on:
postgres:
condition: service_healthy
kafka:
condition: service_healthy
8 changes: 8 additions & 0 deletions fraud-consumer/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

FROM node:20-alpine
WORKDIR /app
COPY package*.json ./
RUN npm install
COPY . .
RUN npm run build
CMD ["node", "dist/main.js"]
22 changes: 22 additions & 0 deletions fraud-consumer/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"name": "payment-api",
"version": "1.0.0",
"private": true,
"scripts": {
"build": "tsc"
},
"dependencies": {
"@nestjs/common": "^10.0.0",
"@nestjs/core": "^10.0.0",
"@nestjs/schedule": "^4.0.0",
"kafkajs": "^2.2.4",
"pg": "^8.11.5",
"reflect-metadata": "^0.2.0",
"rxjs": "^7.8.1",
"typescript": "^5.4.5"
},
"devDependencies": {
"@types/node": "^20.11.30",
"@types/pg": "^8.11.6"
}
}
7 changes: 7 additions & 0 deletions fraud-consumer/src/app.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

import { Module } from '@nestjs/common';
import { ScheduleModule } from '@nestjs/schedule';
import { DbService } from './db.service';
import { WorkerService } from './worker.service';
@Module({ imports: [ScheduleModule.forRoot()], providers: [DbService, WorkerService] })
export class AppModule {}
7 changes: 7 additions & 0 deletions fraud-consumer/src/db.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

import { Injectable } from '@nestjs/common';
import { Pool } from 'pg';
@Injectable()
export class DbService {
pool = new Pool({ connectionString: process.env.DATABASE_URL });
}
7 changes: 7 additions & 0 deletions fraud-consumer/src/main.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
async function bootstrap() {
await NestFactory.createApplicationContext(AppModule);
}
bootstrap();
112 changes: 112 additions & 0 deletions fraud-consumer/src/worker.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import { Injectable, OnModuleInit } from '@nestjs/common';
import { Kafka } from 'kafkajs';
import { DbService } from './db.service';

const sleep = (ms: number) =>
new Promise(resolve => setTimeout(resolve, ms));

@Injectable()
export class WorkerService implements OnModuleInit {
private readonly consumerName = process.env.CONSUMER_NAME!;
private readonly topic = 'payment.created.v1';

constructor(private readonly db: DbService) {}

async onModuleInit() {
const kafka = new Kafka({
brokers: ['kafka:9092'],
retry: {
retries: 10,
initialRetryTime: 300,
},
});

const consumer = kafka.consumer({
groupId: this.consumerName,
allowAutoTopicCreation: true,
});

// 🔁 Retry de arranque para tolerar:
// - leader election
// - metadata refresh
// - topic propagation
while (true) {
try {
console.log(`[${this.consumerName}] Connecting to Kafka...`);

await consumer.connect();

console.log(`[${this.consumerName}] Subscribing to topic: ${this.topic}`);

await consumer.subscribe({
topic: this.topic,
fromBeginning: false,
});

console.log(`[${this.consumerName}] Starting consumption`);

await consumer.run({
eachMessage: async ({ message }) => {
if (!message.value) return;

const event = JSON.parse(message.value.toString());
await this.handleEvent(event);
},
});

break; // ✅ Arranque exitoso
} catch (err: any) {
console.error(
`[${this.consumerName}] Kafka not ready yet, retrying...`,
err.message,
);
await sleep(2000);
}
}
}

private async handleEvent(event: any) {
const client = await this.db.pool.connect();

try {
await client.query('BEGIN');

// ✅ Idempotencia por evento + consumer
const exists = await client.query(
`
SELECT 1
FROM processed_events
WHERE event_id = $1 AND consumer = $2
`,
[event.eventId, this.consumerName],
);

if (exists.rowCount === 0) {
// ✅ Registrar ACK
await client.query(
`
INSERT INTO payment_ack (payment_id, consumer, status)
VALUES ($1, $2, 'ok')
`,
[event.aggregateId, this.consumerName],
);

// ✅ Marcar evento como procesado
await client.query(
`
INSERT INTO processed_events (event_id, consumer, processed_at)
VALUES ($1, $2, now())
`,
[event.eventId, this.consumerName],
);
}

await client.query('COMMIT');
} catch (err) {
await client.query('ROLLBACK');
throw err;
} finally {
client.release();
}
}
}
12 changes: 12 additions & 0 deletions fraud-consumer/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"compilerOptions": {
"strict": true,
"target": "ES2022",
"module": "commonjs",
"outDir": "dist",
"experimentalDecorators": true,
"emitDecoratorMetadata": true,
"types": ["node"],
"lib": ["ES2022"]
}
}
8 changes: 8 additions & 0 deletions ledger-consumer/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

FROM node:20-alpine
WORKDIR /app
COPY package*.json ./
RUN npm install
COPY . .
RUN npm run build
CMD ["node", "dist/main.js"]
22 changes: 22 additions & 0 deletions ledger-consumer/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"name": "payment-api",
"version": "1.0.0",
"private": true,
"scripts": {
"build": "tsc"
},
"dependencies": {
"@nestjs/common": "^10.0.0",
"@nestjs/core": "^10.0.0",
"@nestjs/schedule": "^4.0.0",
"kafkajs": "^2.2.4",
"pg": "^8.11.5",
"reflect-metadata": "^0.2.0",
"rxjs": "^7.8.1",
"typescript": "^5.4.5"
},
"devDependencies": {
"@types/node": "^20.11.30",
"@types/pg": "^8.11.6"
}
}
7 changes: 7 additions & 0 deletions ledger-consumer/src/app.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

import { Module } from '@nestjs/common';
import { ScheduleModule } from '@nestjs/schedule';
import { DbService } from './db.service';
import { WorkerService } from './worker.service';
@Module({ imports: [ScheduleModule.forRoot()], providers: [DbService, WorkerService] })
export class AppModule {}
7 changes: 7 additions & 0 deletions ledger-consumer/src/db.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

import { Injectable } from '@nestjs/common';
import { Pool } from 'pg';
@Injectable()
export class DbService {
pool = new Pool({ connectionString: process.env.DATABASE_URL });
}
7 changes: 7 additions & 0 deletions ledger-consumer/src/main.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
async function bootstrap() {
await NestFactory.createApplicationContext(AppModule);
}
bootstrap();
Loading