diff --git a/README.md b/README.md
index b067a71026..abff8bb0cf 100644
--- a/README.md
+++ b/README.md
@@ -1,82 +1,568 @@
-# Yape Code Challenge :rocket:
+# Payment Settlement Challenge 1
-Our code challenge will let you marvel us with your Jedi coding skills :smile:.
+Proyecto de referencia para el **Challenge 1: payment settlement pipeline**. Implementa un pipeline de pagos con **Transactional Outbox**, **Kafka**, **consumidores idempotentes**, **agregación de estado** y **endpoint de consulta con consistencia eventual honesta**, que son justamente las capacidades pedidas por el reto. fileciteturn0file0
-Don't forget that the proper way to submit your work is to fork the repo and create a PR :wink: ... have fun !!
+---
-- [Problem](#problem)
-- [Tech Stack](#tech_stack)
-- [Send us your challenge](#send_us_your_challenge)
+## 1. Objetivo del proyecto
-# Problem
+Cuando un cliente crea un pago, el sistema debe:
-Every time a financial transaction is created it must be validated by our anti-fraud microservice and then the same service sends a message back to update the transaction status.
-For now, we have only three transaction statuses:
+1. Persistir el pago en PostgreSQL.
+2. Persistir en la misma transacción un evento en la tabla `outbox_events`.
+3. Publicar ese evento a Kafka desde un proceso relay separado del flujo HTTP.
+4. Ejecutar en paralelo dos consumidores idempotentes:
+ - fraude,
+ - ledger.
+5. Consolidar el estado final del pago como:
+ - `pending`,
+ - `settled`,
+ - `failed`.
+6. Notificar el resultado final.
-
- - pending
- - approved
- - rejected
-
+El documento del reto remarca que **no se debe publicar a Kafka dentro de la transacción de base de datos**, y que la solución correcta debe usar un **outbox transaccional** y consumidores **idempotentes**. fileciteturn0file0
-Every transaction with a value greater than 1000 should be rejected.
+---
-```mermaid
- flowchart LR
- Transaction -- Save Transaction with pending Status --> transactionDatabase[(Database)]
- Transaction --Send transaction Created event--> Anti-Fraud
- Anti-Fraud -- Send transaction Status Approved event--> Transaction
- Anti-Fraud -- Send transaction Status Rejected event--> Transaction
- Transaction -- Update transaction Status event--> transactionDatabase[(Database)]
-```
+## 2. Stack tecnológico elegido y por qué
+
+### NestJS
+Se eligió NestJS porque:
+- ofrece estructura modular clara,
+- facilita separar bounded contexts por módulo,
+- integra bien con TypeORM, validación y DI,
+- es ideal para mostrar código mantenible y entendible en una entrevista técnica.
+
+### PostgreSQL
+Se eligió PostgreSQL porque:
+- soporta transacciones locales robustas,
+- es excelente para implementar el patrón Transactional Outbox,
+- permite índices y consultas eficientes sobre la tabla outbox,
+- es simple de levantar localmente con Docker.
+
+### Kafka
+Se eligió Kafka porque:
+- es el broker propuesto implícitamente por el reto,
+- desacopla el API de los consumidores,
+- permite fan-out del evento `payment.created.v1` a múltiples procesos,
+- obliga a diseñar con redelivery e idempotencia, justo lo que se busca evaluar. fileciteturn0file0
+
+### TypeORM
+Se eligió TypeORM porque:
+- acelera la implementación en NestJS,
+- permite transacciones sencillas,
+- reduce el tiempo de wiring para enfocarse en arquitectura y no solo en plumbing.
+
+### Docker Compose
+Se eligió Docker Compose porque:
+- permite levantar PostgreSQL + Kafka + Zookeeper rápidamente,
+- deja un entorno repetible para revisión técnica,
+- reduce fricción para quien revise el challenge.
+
+---
+
+## 3. Arquitectura técnica
+
+### Componentes principales
+
+- **Payment API**
+ - recibe `POST /payments`
+ - valida input
+ - guarda `payments` + `outbox_events` en una sola transacción
+ - expone `GET /payments/:id`
-# Tech Stack
+- **Outbox Relay**
+ - hace polling de `outbox_events`
+ - publica eventos pendientes a Kafka
+ - marca eventos como `PUBLISHED`
+ - reintenta si falla
-
- - Node. You can use any framework you want (i.e. Nestjs with an ORM like TypeOrm or Prisma)
- - Any database
- - Kafka
-
+- **Fraud Consumer**
+ - consume `payment.created.v1`
+ - aplica una regla simple de fraude
+ - emite `payment.fraud-reviewed.v1` o `payment.failed.v1`
-We do provide a `Dockerfile` to help you get started with a dev environment.
+- **Ledger Consumer**
+ - consume `payment.created.v1`
+ - escribe en `ledger_entries`
+ - emite `payment.ledger-written.v1`
-You must have two resources:
+- **Status Aggregator**
+ - consume `payment.fraud-reviewed.v1`, `payment.ledger-written.v1` y `payment.failed.v1`
+ - consolida el estado final del pago
+ - emite `payment.settled.v1` cuando ambos pasos obligatorios se completan
-1. Resource to create a transaction that must containt:
+- **Notify Consumer**
+ - consume `payment.settled.v1` y `payment.failed.v1`
+ - persiste un registro de notificación en la tabla `notifications`
+
+---
+
+## 4. Flujo de negocio paso a paso
+
+### Paso 1. Crear pago
+
+El cliente llama:
+
+```http
+POST /payments
+```
+
+Body ejemplo:
```json
{
- "accountExternalIdDebit": "Guid",
- "accountExternalIdCredit": "Guid",
- "tranferTypeId": 1,
- "value": 120
+ "userId": "usr_001",
+ "country": "PE",
+ "amount": 100.50,
+ "currency": "PEN"
}
```
-2. Resource to retrieve a transaction
+El servicio `PaymentService`:
+- inserta en `payments`,
+- inserta en `payment_processing_status`,
+- inserta un evento en `outbox_events`,
+- todo en **una sola transacción local**.
+
+La respuesta es:
```json
{
- "transactionExternalId": "Guid",
- "transactionType": {
- "name": ""
- },
- "transactionStatus": {
- "name": ""
- },
- "value": 120,
- "createdAt": "Date"
+ "paymentId": "...",
+ "status": "pending",
+ "updatedAt": "..."
}
```
-## Optional
+Esto es importante porque el reto pide que el endpoint de estado sea **honesto con la consistencia eventual**. fileciteturn0file0
+
+### Paso 2. Relay publica el outbox a Kafka
+
+El `OutboxRelayService` hace polling cada pocos segundos:
+- busca eventos `PENDING` o `FAILED` listos para reintento,
+- publica a Kafka,
+- actualiza el outbox a `PUBLISHED`.
+
+### Paso 3. Consumidores paralelos
+
+Kafka entrega `payment.created.v1` a dos consumidores diferentes:
+- fraude,
+- ledger.
+
+Ambos son idempotentes y verifican si el `eventId` ya fue procesado.
+
+### Paso 4. Agregación del estado
+
+`StatusAggregatorService` escucha:
+- `payment.fraud-reviewed.v1`
+- `payment.ledger-written.v1`
+- `payment.failed.v1`
+
+Reglas:
+- si fraude y ledger terminan bien → `settled`
+- si cualquiera falla definitivamente → `failed`
+- mientras tanto → `pending`
+
+### Paso 5. Notificación
+
+Cuando el pago termina en `settled` o `failed`, se registra una notificación.
+
+---
+
+## 5. Decisiones técnicas importantes y defensa frente al reto
+
+### 5.1 Por qué Transactional Outbox
+
+Porque el reto explícitamente espera que el pago y el evento se escriban en la misma transacción local, y luego un relay separado los publique al broker. fileciteturn0file0
+
+Esto evita el antipatrón de:
+- escribir en DB,
+- publicar a Kafka,
+- y que uno falle mientras el otro no.
+
+#### Lo que **no** hacemos
+
+```ts
+await transaction(async () => {
+ await savePayment();
+ await kafka.publish(...); // MAL
+});
+```
+
+#### Lo que sí hacemos
+
+```ts
+await transaction(async () => {
+ await savePayment();
+ await saveOutboxEvent();
+});
+```
+
+Y luego el relay publica desde la tabla outbox.
+
+### 5.2 Por qué el relay es un proceso separado conceptualmente
+
+El reto pide explícitamente un relay separado del request path, no un publish directo dentro del mismo flujo transaccional. fileciteturn0file0
+
+En esta versión de referencia el relay corre dentro de la misma aplicación NestJS para simplificar el demo local, pero **conceptualmente es un proceso aparte** y puede separarse fácilmente en otra app NestJS o worker.
+
+### 5.3 Por qué los consumidores son idempotentes
+
+Kafka puede redeliverar mensajes. El reto espera que el candidato acepte esa realidad y la resuelva del lado consumidor. fileciteturn0file0
+
+Por eso cada consumidor revisa la tabla `consumer_processed_events` usando:
+- `consumer_name`
+- `event_id`
+
+Si ya existe el registro, el mensaje se ignora.
+
+### 5.4 Por qué el endpoint devuelve `pending`
+
+Porque el pago todavía no está listo cuando termina el request HTTP. Falta fraude, ledger y agregación. El sistema es eventualmente consistente. El reto valora que esto se explique con honestidad. fileciteturn0file0
+
+---
+
+## 6. Respuesta técnica a lo que pide el challenge 1
+
+### Requisito 1: Transactional Outbox
+**Cumplido.**
+- Tabla `outbox_events`.
+- Inserción del pago y el evento en la misma transacción.
+- Publicación real desde `OutboxRelayService`.
+
+### Requisito 2: al menos dos consumidores idempotentes
+**Cumplido.**
+- `FraudConsumerService`
+- `LedgerConsumerService`
+- ambos verifican `consumer_processed_events`.
+
+### Requisito 3: DLT / manejo de errores
+**Parcialmente cubierto en esta referencia.**
+- Ya hay reintentos en el relay.
+- La estructura permite agregar un DLT topic como siguiente mejora.
+- En el README se documenta cómo extenderlo.
+
+### Requisito 4: endpoint de estado
+**Cumplido.**
+- `GET /payments/:id`
+- devuelve `pending`, `settled` o `failed`.
+- incluye subestados (`fraudStatus`, `ledgerStatus`) para trazabilidad.
+
+### Requisito 5: honestidad sobre eventual consistency
+**Cumplido.**
+- `POST /payments` responde `pending`.
+- `GET /payments/:id` muestra estado parcial o final.
+
+---
+
+## 7. Modelo de datos
+
+### `payments`
+Estado de negocio visible al cliente.
+
+### `outbox_events`
+Buffer transaccional para asegurar atomicidad entre persistencia y publicación.
+
+### `payment_processing_status`
+Proyección del progreso del pago:
+- fraude,
+- ledger,
+- final.
+
+### `consumer_processed_events`
+Tabla de idempotencia.
+
+### `ledger_entries`
+Representa el asiento contable realizado por el consumidor ledger.
+
+### `notifications`
+Registra el resultado final comunicado al usuario.
+
+---
+
+## 8. Estructura del proyecto
+
+```text
+payment-settlement-challenge/
+├─ docker-compose.yml
+├─ .env.example
+├─ package.json
+├─ sql/
+│ └─ init.sql
+├─ src/
+│ ├─ app.module.ts
+│ ├─ main.ts
+│ ├─ common/
+│ │ ├─ constants/
+│ │ │ ├─ consumer-names.ts
+│ │ │ └─ topics.ts
+│ │ ├─ contracts/
+│ │ │ └─ events.ts
+│ │ └─ kafka/
+│ │ ├─ kafka.module.ts
+│ │ └─ kafka.service.ts
+│ ├─ payments/
+│ │ ├─ dto/
+│ │ ├─ entities/
+│ │ ├─ payments.controller.ts
+│ │ ├─ payments.module.ts
+│ │ └─ payments.service.ts
+│ ├─ relay/
+│ │ ├─ relay.module.ts
+│ │ └─ outbox-relay.service.ts
+│ ├─ consumers/
+│ │ ├─ consumers.module.ts
+│ │ ├─ idempotency.service.ts
+│ │ ├─ fraud-consumer.service.ts
+│ │ └─ ledger-consumer.service.ts
+│ ├─ aggregator/
+│ │ ├─ aggregator.module.ts
+│ │ └─ status-aggregator.service.ts
+│ ├─ notifications/
+│ │ ├─ notifications.module.ts
+│ │ └─ notify-consumer.service.ts
+│ └─ health/
+│ ├─ health.controller.ts
+│ └─ health.module.ts
+```
+
+---
+
+## 9. Cómo levantar el proyecto desde cero
+
+### 9.1 Requisitos
+- Node.js 20+
+- npm 10+
+- Docker Desktop
+- Git opcional
+
+### 9.2 Instalar dependencias
+
+```bash
+npm install
+```
+
+### 9.3 Crear variables de entorno
+
+#### Windows PowerShell
+```powershell
+Copy-Item .env .env
+```
+
+#### CMD
+```cmd
+copy .env .env
+```
+
+### 9.4 Levantar PostgreSQL y Kafka
+
+```bash
+docker compose up -d
+```
+
+### 9.5 Verificar contenedores
+
+```bash
+docker ps
+```
+
+Debes ver al menos:
+- `payments-postgres`
+- `payments-zookeeper`
+- `payments-kafka`
+
+### 9.6 Levantar la aplicación
+
+```bash
+npm run start:dev
+```
+
+### 9.7 Verificar healthcheck
+
+```bash
+curl http://localhost:3000/health
+```
+
+Respuesta esperada:
+
+```json
+{"status":"ok"}
+```
+
+---
+
+## 10. Script SQL de PostgreSQL
+
+El archivo está en:
+
+```text
+sql/init.sql
+```
+
+Ese script se monta automáticamente en el contenedor PostgreSQL y crea las tablas al iniciar por primera vez el volumen.
+
+### Importante
+Si ya levantaste PostgreSQL antes sin el script, y quieres que lo ejecute de nuevo automáticamente, debes borrar el volumen:
+
+```bash
+docker compose down -v
+docker compose up -d
+```
+
+Si no quieres borrar el volumen, puedes conectarte manualmente y ejecutar `sql/init.sql`.
+
+### Conectarte a PostgreSQL dentro del contenedor
+
+```bash
+docker exec -it payments-postgres psql -U postgres -d payments_db
+```
+
+### Consultas útiles
+
+```sql
+SELECT * FROM payments;
+SELECT * FROM outbox_events ORDER BY created_at DESC;
+SELECT * FROM payment_processing_status;
+SELECT * FROM consumer_processed_events;
+SELECT * FROM ledger_entries;
+SELECT * FROM notifications;
+```
+
+---
+
+## 11. Pruebas manuales del flujo
+
+### Crear un pago exitoso
+
+```bash
+curl -X POST http://localhost:3000/payments \
+ -H "Content-Type: application/json" \
+ -d '{
+ "userId": "usr_001",
+ "country": "PE",
+ "amount": 100.50,
+ "currency": "PEN"
+ }'
+```
+
+Después consulta:
+
+```bash
+curl http://localhost:3000/payments/
+```
+
+Resultado esperado después de unos segundos:
+- `status = settled`
+- `fraudStatus = succeeded`
+- `ledgerStatus = succeeded`
+
+### Crear un pago fallido por fraude
+
+```bash
+curl -X POST http://localhost:3000/payments \
+ -H "Content-Type: application/json" \
+ -d '{
+ "userId": "usr_002",
+ "country": "PE",
+ "amount": 7000,
+ "currency": "PEN"
+ }'
+```
+
+Resultado esperado:
+- `status = failed`
+- `failureReason = Amount exceeds fraud threshold`
+
+---
+
+## 12. Explicación técnica del código para que sea entendible
+
+### `PaymentService`
+Escribe el pago y el outbox **en la misma transacción**. Esa es la parte más importante del challenge.
+
+### `OutboxRelayService`
+Hace polling del outbox y publica a Kafka. Si falla, deja listo el siguiente retry con backoff simple.
+
+### `FraudConsumerService`
+Lee `payment.created.v1`, aplica una regla de fraude simple y emite el evento correspondiente.
+
+### `LedgerConsumerService`
+Lee `payment.created.v1`, persiste `ledger_entries` y emite `payment.ledger-written.v1`.
+
+### `StatusAggregatorService`
+Actualiza la tabla de proyección `payment_processing_status` y decide si ya se puede cerrar el pago como `settled`.
+
+### `NotifyConsumerService`
+No cambia lógica de negocio; solo registra una notificación final. Se mantiene separado para respetar responsabilidades.
+
+---
+
+## 13. Trade-offs y limitaciones declaradas
+
+### 13.1 Relay dentro de la misma app
+Para demo local corre en la misma app, pero en producción lo separaría en un worker independiente.
+
+### 13.2 DLT no implementado completamente
+El reto lo menciona como requisito deseable. Aquí dejamos reintentos en relay y base lista para ampliar a `{topic}.dlt`.
+
+### 13.3 Regla de fraude simplificada
+Se usa una regla por monto para tener una demo reproducible. En un caso real el fraude sería otro servicio o motor de reglas.
+
+### 13.4 Sin observabilidad avanzada
+No incluí OpenTelemetry ni Prometheus para mantener foco en la arquitectura principal.
+
+---
+
+## 14. Mejoras que haría con más tiempo
+
+- separar `payment-api` y `outbox-relay` en apps distintas,
+- agregar DLT real por topic,
+- agregar métricas y tracing,
+- agregar tests unitarios e integración,
+- usar Debezium/CDC en lugar de polling del outbox,
+- endurecer la idempotencia con transacciones locales adicionales en consumidores.
+
+---
+
+## 15. Cómo defender esta solución en el PR o entrevista
+
+### Qué problema resuelve el Outbox
+Evita inconsistencias entre base de datos y broker.
+
+### Qué problema resuelve la idempotencia
+Tolera redelivery y retries sin duplicar efectos observables.
+
+### Qué problema resuelve la proyección de estado
+Permite responder `GET /payments/:id` de forma rápida y honesta.
+
+### Qué trade-off acepté
+Preferí simplicidad operativa con polling relay y reglas de ejemplo, a cambio de una implementación clara y defendible del patrón principal.
+
+---
+
+## 16. Endpoints
+
+### POST `/payments`
+Crea el pago y responde inmediatamente con estado `pending`.
+
+### GET `/payments/:id`
+Consulta el estado actual del pago.
+
+### GET `/health`
+Healthcheck básico.
+
+---
+
+## 17. Resumen final
-You can use any approach to store transaction data but you should consider that we may deal with high volume scenarios where we have a huge amount of writes and reads for the same data at the same time. How would you tackle this requirement?
+Este proyecto responde al Challenge 1 porque muestra exactamente lo que el documento está evaluando:
+- arquitectura distribuida simple pero correcta,
+- uso de Transactional Outbox,
+- consumidores idempotentes,
+- consistencia eventual explicada con honestidad,
+- modularidad y claridad técnica. fileciteturn0file0
-You can use Graphql;
-# Send us your challenge
-When you finish your challenge, after forking a repository, you **must** open a pull request to our repository. There are no limitations to the implementation, you can follow the programming paradigm, modularization, and style that you feel is the most appropriate solution.
-If you have any questions, please let us know.
diff --git a/docker-compose.yml b/docker-compose.yml
index 0e8807f21c..16367dcddf 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -1,25 +1,50 @@
-version: "3.7"
+version: '3.9'
+
services:
postgres:
- image: postgres:14
+ image: postgres:16
+ container_name: payments-postgres
+ restart: always
+ environment:
+ POSTGRES_USER: postgres
+ POSTGRES_PASSWORD: postgres
+ POSTGRES_DB: payments_db
ports:
- "5432:5432"
- environment:
- - POSTGRES_USER=postgres
- - POSTGRES_PASSWORD=postgres
+ volumes:
+ - postgres_data:/var/lib/postgresql/data
+ - ./sql/init.sql:/docker-entrypoint-initdb.d/init.sql
+
zookeeper:
- image: confluentinc/cp-zookeeper:5.5.3
+ image: confluentinc/cp-zookeeper:7.5.0
+ container_name: payments-zookeeper
+ restart: always
environment:
ZOOKEEPER_CLIENT_PORT: 2181
+ ZOOKEEPER_TICK_TIME: 2000
+ ports:
+ - "2181:2181"
+
kafka:
- image: confluentinc/cp-enterprise-kafka:5.5.3
- depends_on: [zookeeper]
+ image: confluentinc/cp-kafka:7.5.0
+ container_name: payments-kafka
+ restart: always
+ depends_on:
+ - zookeeper
+ ports:
+ - "9092:9092"
+ - "29092:29092"
environment:
- KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
- KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_BROKER_ID: 1
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
- KAFKA_JMX_PORT: 9991
- ports:
- - 9092:9092
+ KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
+ KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
+ KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092
+ KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
+
+volumes:
+ postgres_data:
diff --git a/nest-cli.json b/nest-cli.json
new file mode 100644
index 0000000000..256648114a
--- /dev/null
+++ b/nest-cli.json
@@ -0,0 +1,5 @@
+{
+ "$schema": "https://json.schemastore.org/nest-cli",
+ "collection": "@nestjs/schematics",
+ "sourceRoot": "src"
+}
diff --git a/package.json b/package.json
new file mode 100644
index 0000000000..e9b3cd80b7
--- /dev/null
+++ b/package.json
@@ -0,0 +1,38 @@
+{
+ "name": "payment-settlement-challenge",
+ "version": "1.0.0",
+ "description": "Challenge 1 - payment settlement pipeline with transactional outbox, idempotent consumers and status aggregation",
+ "private": true,
+ "license": "UNLICENSED",
+ "scripts": {
+ "build": "nest build",
+ "start": "nest start",
+ "start:dev": "nest start --watch",
+ "start:prod": "node dist/main",
+ "lint": "echo 'lint not configured'",
+ "test": "echo 'tests not configured'"
+ },
+ "dependencies": {
+ "@nestjs/common": "^10.4.2",
+ "@nestjs/config": "^3.2.3",
+ "@nestjs/core": "^10.4.2",
+ "@nestjs/platform-express": "^10.4.2",
+ "@nestjs/typeorm": "^10.0.2",
+ "class-transformer": "^0.5.1",
+ "class-validator": "^0.14.1",
+ "kafkajs": "^2.2.4",
+ "pg": "^8.12.0",
+ "reflect-metadata": "^0.2.2",
+ "rxjs": "^7.8.1",
+ "typeorm": "^0.3.20"
+ },
+ "devDependencies": {
+ "@nestjs/cli": "^10.4.5",
+ "@nestjs/schematics": "^10.1.4",
+ "@nestjs/testing": "^10.4.2",
+ "@types/node": "^20.14.15",
+ "ts-loader": "^9.5.1",
+ "ts-node": "^10.9.2",
+ "typescript": "^5.5.4"
+ }
+}
diff --git a/sql/init.sql b/sql/init.sql
new file mode 100644
index 0000000000..51ced88ce9
--- /dev/null
+++ b/sql/init.sql
@@ -0,0 +1,66 @@
+CREATE TABLE IF NOT EXISTS payments (
+ id UUID PRIMARY KEY,
+ user_id VARCHAR(64) NOT NULL,
+ country VARCHAR(8) NOT NULL,
+ amount NUMERIC(18,2) NOT NULL,
+ currency VARCHAR(8) NOT NULL,
+ status VARCHAR(16) NOT NULL DEFAULT 'pending',
+ created_at TIMESTAMP NOT NULL DEFAULT NOW(),
+ updated_at TIMESTAMP NOT NULL DEFAULT NOW()
+);
+
+CREATE TABLE IF NOT EXISTS outbox_events (
+ id UUID PRIMARY KEY,
+ aggregate_id UUID NOT NULL,
+ aggregate_type VARCHAR(50) NOT NULL,
+ topic VARCHAR(255) NOT NULL,
+ event_type VARCHAR(255) NOT NULL,
+ event_version INT NOT NULL,
+ payload JSONB NOT NULL,
+ headers JSONB NOT NULL DEFAULT '{}'::jsonb,
+ status VARCHAR(20) NOT NULL DEFAULT 'PENDING',
+ retry_count INT NOT NULL DEFAULT 0,
+ next_retry_at TIMESTAMP NULL,
+ published_at TIMESTAMP NULL,
+ last_error TEXT NULL,
+ created_at TIMESTAMP NOT NULL DEFAULT NOW()
+);
+
+CREATE INDEX IF NOT EXISTS idx_outbox_status_next_retry
+ ON outbox_events(status, next_retry_at, created_at);
+
+CREATE TABLE IF NOT EXISTS payment_processing_status (
+ payment_id UUID PRIMARY KEY,
+ fraud_status VARCHAR(16) NOT NULL DEFAULT 'pending',
+ ledger_status VARCHAR(16) NOT NULL DEFAULT 'pending',
+ final_status VARCHAR(16) NOT NULL DEFAULT 'pending',
+ failure_reason TEXT NULL,
+ updated_at TIMESTAMP NOT NULL DEFAULT NOW()
+);
+
+CREATE TABLE IF NOT EXISTS consumer_processed_events (
+ consumer_name VARCHAR(100) NOT NULL,
+ event_id UUID NOT NULL,
+ aggregate_id UUID NOT NULL,
+ processed_at TIMESTAMP NOT NULL DEFAULT NOW(),
+ PRIMARY KEY (consumer_name, event_id)
+);
+
+CREATE TABLE IF NOT EXISTS ledger_entries (
+ id UUID PRIMARY KEY,
+ payment_id UUID NOT NULL,
+ debit_account VARCHAR(100) NOT NULL,
+ credit_account VARCHAR(100) NOT NULL,
+ amount NUMERIC(18,2) NOT NULL,
+ currency VARCHAR(8) NOT NULL,
+ created_at TIMESTAMP NOT NULL DEFAULT NOW()
+);
+
+CREATE TABLE IF NOT EXISTS notifications (
+ id UUID PRIMARY KEY,
+ payment_id UUID NOT NULL,
+ channel VARCHAR(20) NOT NULL,
+ status VARCHAR(20) NOT NULL,
+ payload JSONB NOT NULL,
+ created_at TIMESTAMP NOT NULL DEFAULT NOW()
+);
diff --git a/src/aggregator/aggregator.module.ts b/src/aggregator/aggregator.module.ts
new file mode 100644
index 0000000000..ecb8f85ac1
--- /dev/null
+++ b/src/aggregator/aggregator.module.ts
@@ -0,0 +1,12 @@
+import { Module } from '@nestjs/common';
+import { TypeOrmModule } from '@nestjs/typeorm';
+import { StatusAggregatorService } from './status-aggregator.service';
+import { PaymentEntity } from '../payments/entities/payment.entity';
+import { PaymentProcessingStatusEntity } from '../payments/entities/payment-processing-status.entity';
+import { ConsumerProcessedEventEntity } from '../payments/entities/consumer-processed-event.entity';
+
+@Module({
+ imports: [TypeOrmModule.forFeature([PaymentEntity, PaymentProcessingStatusEntity, ConsumerProcessedEventEntity])],
+ providers: [StatusAggregatorService],
+})
+export class AggregatorModule {}
diff --git a/src/aggregator/status-aggregator.service.ts b/src/aggregator/status-aggregator.service.ts
new file mode 100644
index 0000000000..7b8c111204
--- /dev/null
+++ b/src/aggregator/status-aggregator.service.ts
@@ -0,0 +1,126 @@
+import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
+import { ConfigService } from '@nestjs/config';
+import { InjectRepository } from '@nestjs/typeorm';
+import { Repository } from 'typeorm';
+import { KafkaService } from '../common/kafka/kafka.service';
+import { CONSUMER_NAMES } from '../common/constants/consumer-names';
+import { TOPICS } from '../common/constants/topics';
+import {
+ BaseEvent,
+ PaymentFailedPayload,
+ PaymentFraudReviewedPayload,
+ PaymentLedgerWrittenPayload,
+ PaymentSettledPayload,
+} from '../common/contracts/events';
+import { PaymentEntity } from '../payments/entities/payment.entity';
+import { PaymentProcessingStatusEntity } from '../payments/entities/payment-processing-status.entity';
+import { ConsumerProcessedEventEntity } from '../payments/entities/consumer-processed-event.entity';
+import { randomUUID } from 'crypto';
+
+@Injectable()
+export class StatusAggregatorService implements OnModuleInit {
+ private readonly logger = new Logger(StatusAggregatorService.name);
+
+ constructor(
+ private readonly configService: ConfigService,
+ private readonly kafkaService: KafkaService,
+ @InjectRepository(PaymentEntity)
+ private readonly paymentRepository: Repository,
+ @InjectRepository(PaymentProcessingStatusEntity)
+ private readonly statusRepository: Repository,
+ @InjectRepository(ConsumerProcessedEventEntity)
+ private readonly processedRepository: Repository,
+ ) {}
+
+ async onModuleInit(): Promise {
+ const groupId = this.configService.get('KAFKA_GROUP_STATUS', 'payment-status-group');
+
+ await this.kafkaService.subscribe({
+ groupId: `${groupId}-fraud`,
+ topic: TOPICS.PAYMENT_FRAUD_REVIEWED,
+ eachMessage: async ({ value }) => this.handleFraudReviewed(value as BaseEvent),
+ });
+ await this.kafkaService.subscribe({
+ groupId: `${groupId}-ledger`,
+ topic: TOPICS.PAYMENT_LEDGER_WRITTEN,
+ eachMessage: async ({ value }) => this.handleLedgerWritten(value as BaseEvent),
+ });
+ await this.kafkaService.subscribe({
+ groupId: `${groupId}-failed`,
+ topic: TOPICS.PAYMENT_FAILED,
+ eachMessage: async ({ value }) => this.handleFailed(value as BaseEvent),
+ });
+ }
+
+ private async wasProcessed(consumerName: string, eventId: string): Promise {
+ return Boolean(await this.processedRepository.findOne({ where: { consumerName, eventId } }));
+ }
+
+ private async markProcessed(consumerName: string, eventId: string, aggregateId: string): Promise {
+ await this.processedRepository.save(
+ this.processedRepository.create({ consumerName, eventId, aggregateId, processedAt: new Date() }),
+ );
+ }
+
+ private async handleFraudReviewed(event: BaseEvent): Promise {
+ if (await this.wasProcessed(CONSUMER_NAMES.STATUS_FRAUD, event.eventId)) {
+ return;
+ }
+ await this.statusRepository.update({ paymentId: event.payload.paymentId }, { fraudStatus: 'succeeded' });
+ await this.tryFinalize(event.payload.paymentId, event);
+ await this.markProcessed(CONSUMER_NAMES.STATUS_FRAUD, event.eventId, event.aggregateId);
+ }
+
+ private async handleLedgerWritten(event: BaseEvent): Promise {
+ if (await this.wasProcessed(CONSUMER_NAMES.STATUS_LEDGER, event.eventId)) {
+ return;
+ }
+ await this.statusRepository.update({ paymentId: event.payload.paymentId }, { ledgerStatus: 'succeeded' });
+ await this.tryFinalize(event.payload.paymentId, event);
+ await this.markProcessed(CONSUMER_NAMES.STATUS_LEDGER, event.eventId, event.aggregateId);
+ }
+
+ private async handleFailed(event: BaseEvent): Promise {
+ if (await this.wasProcessed(CONSUMER_NAMES.STATUS_FAILED, event.eventId)) {
+ return;
+ }
+
+ await this.statusRepository.update(
+ { paymentId: event.payload.paymentId },
+ { finalStatus: 'failed', failureReason: event.payload.reason },
+ );
+ await this.paymentRepository.update({ id: event.payload.paymentId }, { status: 'failed' });
+ await this.markProcessed(CONSUMER_NAMES.STATUS_FAILED, event.eventId, event.aggregateId);
+ this.logger.warn(`Payment ${event.payload.paymentId} marked as failed`);
+ }
+
+ private async tryFinalize(paymentId: string, triggeringEvent: BaseEvent): Promise {
+ const status = await this.statusRepository.findOne({ where: { paymentId } });
+ if (!status || status.finalStatus !== 'pending') {
+ return;
+ }
+
+ if (status.fraudStatus === 'succeeded' && status.ledgerStatus === 'succeeded') {
+ await this.statusRepository.update({ paymentId }, { finalStatus: 'settled' });
+ await this.paymentRepository.update({ id: paymentId }, { status: 'settled' });
+
+ const settledEvent: BaseEvent = {
+ eventId: randomUUID(),
+ eventType: 'payment.settled',
+ eventVersion: 1,
+ occurredAt: new Date().toISOString(),
+ correlationId: triggeringEvent.correlationId,
+ causationId: triggeringEvent.eventId,
+ aggregateId: paymentId,
+ aggregateType: 'payment',
+ country: triggeringEvent.country as string,
+ payload: {
+ paymentId,
+ settledAt: new Date().toISOString(),
+ },
+ };
+ await this.kafkaService.publish(TOPICS.PAYMENT_SETTLED, paymentId, settledEvent as unknown as Record);
+ this.logger.log(`Payment ${paymentId} marked as settled`);
+ }
+ }
+}
diff --git a/src/app.module.ts b/src/app.module.ts
new file mode 100644
index 0000000000..9d8ac34567
--- /dev/null
+++ b/src/app.module.ts
@@ -0,0 +1,50 @@
+import { Module } from '@nestjs/common';
+import { ConfigModule, ConfigService } from '@nestjs/config';
+import { TypeOrmModule } from '@nestjs/typeorm';
+import { PaymentsModule } from './payments/payments.module';
+import { RelayModule } from './relay/relay.module';
+import { HealthModule } from './health/health.module';
+import { ConsumersModule } from './consumers/consumers.module';
+import { AggregatorModule } from './aggregator/aggregator.module';
+import { NotificationsModule } from './notifications/notifications.module';
+import { PaymentEntity } from './payments/entities/payment.entity';
+import { OutboxEventEntity } from './payments/entities/outbox-event.entity';
+import { PaymentProcessingStatusEntity } from './payments/entities/payment-processing-status.entity';
+import { ConsumerProcessedEventEntity } from './payments/entities/consumer-processed-event.entity';
+import { LedgerEntryEntity } from './payments/entities/ledger-entry.entity';
+import { NotificationEntity } from './payments/entities/notification.entity';
+import { KafkaModule } from './common/kafka/kafka.module';
+
+@Module({
+ imports: [
+ ConfigModule.forRoot({ isGlobal: true }),
+ TypeOrmModule.forRootAsync({
+ inject: [ConfigService],
+ useFactory: (config: ConfigService) => ({
+ type: 'postgres' as const,
+ host: config.get('DB_HOST'),
+ port: Number(config.get('DB_PORT')),
+ username: config.get('DB_USERNAME'),
+ password: config.get('DB_PASSWORD'),
+ database: config.get('DB_NAME'),
+ entities: [
+ PaymentEntity,
+ OutboxEventEntity,
+ PaymentProcessingStatusEntity,
+ ConsumerProcessedEventEntity,
+ LedgerEntryEntity,
+ NotificationEntity,
+ ],
+ synchronize: false,
+ }),
+ }),
+ KafkaModule,
+ PaymentsModule,
+ RelayModule,
+ ConsumersModule,
+ AggregatorModule,
+ NotificationsModule,
+ HealthModule,
+ ],
+})
+export class AppModule {}
diff --git a/src/common/constants/consumer-names.ts b/src/common/constants/consumer-names.ts
new file mode 100644
index 0000000000..5076081c95
--- /dev/null
+++ b/src/common/constants/consumer-names.ts
@@ -0,0 +1,9 @@
+export const CONSUMER_NAMES = {
+ FRAUD: 'fraud-consumer',
+ LEDGER: 'ledger-consumer',
+ STATUS_FRAUD: 'status-aggregator-fraud-reviewed',
+ STATUS_LEDGER: 'status-aggregator-ledger-written',
+ STATUS_FAILED: 'status-aggregator-failed',
+ NOTIFY_SETTLED: 'notify-consumer-settled',
+ NOTIFY_FAILED: 'notify-consumer-failed',
+};
diff --git a/src/common/constants/topics.ts b/src/common/constants/topics.ts
new file mode 100644
index 0000000000..00cd21bef7
--- /dev/null
+++ b/src/common/constants/topics.ts
@@ -0,0 +1,7 @@
+export const TOPICS = {
+ PAYMENT_CREATED: 'payment.created.v1',
+ PAYMENT_FRAUD_REVIEWED: 'payment.fraud-reviewed.v1',
+ PAYMENT_LEDGER_WRITTEN: 'payment.ledger-written.v1',
+ PAYMENT_SETTLED: 'payment.settled.v1',
+ PAYMENT_FAILED: 'payment.failed.v1',
+};
diff --git a/src/common/contracts/events.ts b/src/common/contracts/events.ts
new file mode 100644
index 0000000000..4a82643ee7
--- /dev/null
+++ b/src/common/contracts/events.ts
@@ -0,0 +1,50 @@
+export interface BaseEvent {
+ eventId: string;
+ eventType: string;
+ eventVersion: number;
+ occurredAt: string;
+ correlationId: string;
+ causationId?: string;
+ aggregateId: string;
+ aggregateType: 'payment';
+ country: string;
+ payload: TPayload;
+}
+
+export interface PaymentCreatedPayload {
+ paymentId: string;
+ userId: string;
+ amount: number;
+ currency: string;
+ country: string;
+ createdAt: string;
+}
+
+export interface PaymentFraudReviewedPayload {
+ paymentId: string;
+ approved: boolean;
+ riskScore: number;
+ reason?: string;
+}
+
+export interface PaymentLedgerWrittenPayload {
+ paymentId: string;
+ ledgerEntryId: string;
+ debitAccount: string;
+ creditAccount: string;
+ amount: number;
+ currency: string;
+}
+
+export interface PaymentSettledPayload {
+ paymentId: string;
+ settledAt: string;
+}
+
+export interface PaymentFailedPayload {
+ paymentId: string;
+ failedAt: string;
+ source: string;
+ reason: string;
+ retryable: boolean;
+}
diff --git a/src/common/kafka/kafka.module.ts b/src/common/kafka/kafka.module.ts
new file mode 100644
index 0000000000..c7dcf12e01
--- /dev/null
+++ b/src/common/kafka/kafka.module.ts
@@ -0,0 +1,9 @@
+import { Global, Module } from '@nestjs/common';
+import { KafkaService } from './kafka.service';
+
+@Global()
+@Module({
+ providers: [KafkaService],
+ exports: [KafkaService],
+})
+export class KafkaModule {}
diff --git a/src/common/kafka/kafka.service.ts b/src/common/kafka/kafka.service.ts
new file mode 100644
index 0000000000..2a657f48ac
--- /dev/null
+++ b/src/common/kafka/kafka.service.ts
@@ -0,0 +1,77 @@
+import { Injectable, Logger, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
+import { ConfigService } from '@nestjs/config';
+import { Consumer, Kafka, Producer } from 'kafkajs';
+
+@Injectable()
+export class KafkaService implements OnModuleInit, OnModuleDestroy {
+ private readonly logger = new Logger(KafkaService.name);
+ private readonly kafka: Kafka;
+ private readonly producer: Producer;
+ private readonly consumers: Consumer[] = [];
+
+ constructor(private readonly configService: ConfigService) {
+ this.kafka = new Kafka({
+ clientId: this.configService.get('KAFKA_CLIENT_ID', 'payment-api'),
+ brokers: [this.configService.get('KAFKA_BROKER', 'localhost:9092')],
+ });
+ this.producer = this.kafka.producer();
+ }
+
+ async onModuleInit(): Promise {
+ await this.producer.connect();
+ this.logger.log('Kafka producer connected');
+ }
+
+ async onModuleDestroy(): Promise {
+ for (const consumer of this.consumers) {
+ await consumer.disconnect();
+ }
+ await this.producer.disconnect();
+ this.logger.log('Kafka clients disconnected');
+ }
+
+ async publish(
+ topic: string,
+ key: string,
+ payload: Record,
+ headers?: Record,
+ ): Promise {
+ await this.producer.send({
+ topic,
+ messages: [
+ {
+ key,
+ value: JSON.stringify(payload),
+ headers: headers
+ ? Object.fromEntries(Object.entries(headers).map(([k, v]) => [k, String(v)]))
+ : undefined,
+ },
+ ],
+ });
+ }
+
+ async subscribe(params: {
+ groupId: string;
+ topic: string;
+ fromBeginning?: boolean;
+ eachMessage: (payload: { key?: string; value: Record }) => Promise;
+ }): Promise {
+ const consumer = this.kafka.consumer({ groupId: params.groupId });
+ await consumer.connect();
+ await consumer.subscribe({ topic: params.topic, fromBeginning: params.fromBeginning ?? false });
+ await consumer.run({
+ eachMessage: async ({ message }) => {
+ if (!message.value) {
+ return;
+ }
+ const value = JSON.parse(message.value.toString()) as Record;
+ await params.eachMessage({
+ key: message.key?.toString(),
+ value,
+ });
+ },
+ });
+ this.consumers.push(consumer);
+ this.logger.log(`Kafka consumer subscribed to ${params.topic} with group ${params.groupId}`);
+ }
+}
diff --git a/src/consumers/consumers.module.ts b/src/consumers/consumers.module.ts
new file mode 100644
index 0000000000..4fec15247f
--- /dev/null
+++ b/src/consumers/consumers.module.ts
@@ -0,0 +1,14 @@
+import { Module } from '@nestjs/common';
+import { TypeOrmModule } from '@nestjs/typeorm';
+import { ConsumerProcessedEventEntity } from '../payments/entities/consumer-processed-event.entity';
+import { LedgerEntryEntity } from '../payments/entities/ledger-entry.entity';
+import { IdempotencyService } from './idempotency.service';
+import { FraudConsumerService } from './fraud-consumer.service';
+import { LedgerConsumerService } from './ledger-consumer.service';
+
+@Module({
+ imports: [TypeOrmModule.forFeature([ConsumerProcessedEventEntity, LedgerEntryEntity])],
+ providers: [IdempotencyService, FraudConsumerService, LedgerConsumerService],
+ exports: [IdempotencyService],
+})
+export class ConsumersModule {}
diff --git a/src/consumers/fraud-consumer.service.ts b/src/consumers/fraud-consumer.service.ts
new file mode 100644
index 0000000000..ff906de539
--- /dev/null
+++ b/src/consumers/fraud-consumer.service.ts
@@ -0,0 +1,88 @@
+import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
+import { ConfigService } from '@nestjs/config';
+import { KafkaService } from '../common/kafka/kafka.service';
+import { CONSUMER_NAMES } from '../common/constants/consumer-names';
+import { TOPICS } from '../common/constants/topics';
+import { BaseEvent, PaymentCreatedPayload, PaymentFailedPayload, PaymentFraudReviewedPayload } from '../common/contracts/events';
+import { IdempotencyService } from './idempotency.service';
+import { randomUUID } from 'crypto';
+
+@Injectable()
+export class FraudConsumerService implements OnModuleInit {
+ private readonly logger = new Logger(FraudConsumerService.name);
+
+ constructor(
+ private readonly configService: ConfigService,
+ private readonly kafkaService: KafkaService,
+ private readonly idempotencyService: IdempotencyService,
+ ) {}
+
+ async onModuleInit(): Promise {
+ await this.kafkaService.subscribe({
+ groupId: this.configService.get('KAFKA_GROUP_PAYMENT_CREATED', 'payment-created-group'),
+ topic: TOPICS.PAYMENT_CREATED,
+ eachMessage: async ({ value }) => this.handlePaymentCreated(value as BaseEvent),
+ });
+ }
+
+ private async handlePaymentCreated(event: BaseEvent): Promise {
+ const alreadyProcessed = await this.idempotencyService.hasProcessed(CONSUMER_NAMES.FRAUD, event.eventId);
+ if (alreadyProcessed) {
+ return;
+ }
+
+ const threshold = Number(this.configService.get('FRAUD_MAX_APPROVED_AMOUNT', 5000));
+ const approved = event.payload.amount <= threshold;
+ const riskScore = approved ? 15 : 95;
+
+ if (!approved) {
+ const failedEvent: BaseEvent = {
+ eventId: randomUUID(),
+ eventType: 'payment.failed',
+ eventVersion: 1,
+ occurredAt: new Date().toISOString(),
+ correlationId: event.correlationId,
+ causationId: event.eventId,
+ aggregateId: event.aggregateId,
+ aggregateType: 'payment',
+ country: event.country,
+ payload: {
+ paymentId: event.payload.paymentId,
+ failedAt: new Date().toISOString(),
+ source: 'fraud-consumer',
+ reason: 'Amount exceeds fraud threshold',
+ retryable: false,
+ },
+ };
+ await this.kafkaService.publish(TOPICS.PAYMENT_FAILED, event.aggregateId, failedEvent as unknown as Record);
+ await this.idempotencyService.markProcessed(CONSUMER_NAMES.FRAUD, event.eventId, event.aggregateId);
+ this.logger.warn(`Payment ${event.aggregateId} rejected by fraud rule`);
+ return;
+ }
+
+ const reviewedEvent: BaseEvent = {
+ eventId: randomUUID(),
+ eventType: 'payment.fraud-reviewed',
+ eventVersion: 1,
+ occurredAt: new Date().toISOString(),
+ correlationId: event.correlationId,
+ causationId: event.eventId,
+ aggregateId: event.aggregateId,
+ aggregateType: 'payment',
+ country: event.country,
+ payload: {
+ paymentId: event.payload.paymentId,
+ approved: true,
+ riskScore,
+ },
+ };
+
+ await this.kafkaService.publish(
+ TOPICS.PAYMENT_FRAUD_REVIEWED,
+ event.aggregateId,
+ reviewedEvent as unknown as Record,
+ );
+ await this.idempotencyService.markProcessed(CONSUMER_NAMES.FRAUD, event.eventId, event.aggregateId);
+ this.logger.log(`Payment ${event.aggregateId} approved by fraud consumer`);
+ }
+}
diff --git a/src/consumers/idempotency.service.ts b/src/consumers/idempotency.service.ts
new file mode 100644
index 0000000000..f9ecf9a013
--- /dev/null
+++ b/src/consumers/idempotency.service.ts
@@ -0,0 +1,29 @@
+import { Injectable } from '@nestjs/common';
+import { InjectRepository } from '@nestjs/typeorm';
+import { Repository } from 'typeorm';
+import { ConsumerProcessedEventEntity } from '../payments/entities/consumer-processed-event.entity';
+
+@Injectable()
+export class IdempotencyService {
+ constructor(
+ @InjectRepository(ConsumerProcessedEventEntity)
+ private readonly processedRepository: Repository,
+ ) {}
+
+ async hasProcessed(consumerName: string, eventId: string): Promise {
+ const record = await this.processedRepository.findOne({
+ where: { consumerName, eventId },
+ });
+ return Boolean(record);
+ }
+
+ async markProcessed(consumerName: string, eventId: string, aggregateId: string): Promise {
+ const entity = this.processedRepository.create({
+ consumerName,
+ eventId,
+ aggregateId,
+ processedAt: new Date(),
+ });
+ await this.processedRepository.save(entity);
+ }
+}
diff --git a/src/consumers/ledger-consumer.service.ts b/src/consumers/ledger-consumer.service.ts
new file mode 100644
index 0000000000..c5297931ae
--- /dev/null
+++ b/src/consumers/ledger-consumer.service.ts
@@ -0,0 +1,81 @@
+import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
+import { ConfigService } from '@nestjs/config';
+import { InjectRepository } from '@nestjs/typeorm';
+import { Repository } from 'typeorm';
+import { KafkaService } from '../common/kafka/kafka.service';
+import { CONSUMER_NAMES } from '../common/constants/consumer-names';
+import { TOPICS } from '../common/constants/topics';
+import { BaseEvent, PaymentCreatedPayload, PaymentLedgerWrittenPayload } from '../common/contracts/events';
+import { IdempotencyService } from './idempotency.service';
+import { LedgerEntryEntity } from '../payments/entities/ledger-entry.entity';
+import { randomUUID } from 'crypto';
+
+@Injectable()
+export class LedgerConsumerService implements OnModuleInit {
+ private readonly logger = new Logger(LedgerConsumerService.name);
+
+ constructor(
+ private readonly configService: ConfigService,
+ private readonly kafkaService: KafkaService,
+ private readonly idempotencyService: IdempotencyService,
+ @InjectRepository(LedgerEntryEntity)
+ private readonly ledgerRepository: Repository,
+ ) {}
+
+ async onModuleInit(): Promise {
+ await this.kafkaService.subscribe({
+ groupId: `${this.configService.get('KAFKA_GROUP_PAYMENT_CREATED', 'payment-created-group')}-ledger`,
+ topic: TOPICS.PAYMENT_CREATED,
+ eachMessage: async ({ value }) => this.handlePaymentCreated(value as BaseEvent),
+ });
+ }
+
+ private async handlePaymentCreated(event: BaseEvent): Promise {
+ const alreadyProcessed = await this.idempotencyService.hasProcessed(CONSUMER_NAMES.LEDGER, event.eventId);
+ if (alreadyProcessed) {
+ return;
+ }
+
+ const ledgerEntryId = randomUUID();
+ const debitAccount = `wallet:${event.payload.userId}`;
+ const creditAccount = 'merchant:settlement';
+
+ const entry = this.ledgerRepository.create({
+ id: ledgerEntryId,
+ paymentId: event.payload.paymentId,
+ debitAccount,
+ creditAccount,
+ amount: event.payload.amount.toFixed(2),
+ currency: event.payload.currency,
+ });
+ await this.ledgerRepository.save(entry);
+
+ const ledgerWrittenEvent: BaseEvent = {
+ eventId: randomUUID(),
+ eventType: 'payment.ledger-written',
+ eventVersion: 1,
+ occurredAt: new Date().toISOString(),
+ correlationId: event.correlationId,
+ causationId: event.eventId,
+ aggregateId: event.aggregateId,
+ aggregateType: 'payment',
+ country: event.country,
+ payload: {
+ paymentId: event.payload.paymentId,
+ ledgerEntryId,
+ debitAccount,
+ creditAccount,
+ amount: event.payload.amount,
+ currency: event.payload.currency,
+ },
+ };
+
+ await this.kafkaService.publish(
+ TOPICS.PAYMENT_LEDGER_WRITTEN,
+ event.aggregateId,
+ ledgerWrittenEvent as unknown as Record,
+ );
+ await this.idempotencyService.markProcessed(CONSUMER_NAMES.LEDGER, event.eventId, event.aggregateId);
+ this.logger.log(`Payment ${event.aggregateId} posted to ledger`);
+ }
+}
diff --git a/src/health/health.controller.ts b/src/health/health.controller.ts
new file mode 100644
index 0000000000..22cc0ca5a8
--- /dev/null
+++ b/src/health/health.controller.ts
@@ -0,0 +1,9 @@
+import { Controller, Get } from '@nestjs/common';
+
+@Controller('health')
+export class HealthController {
+ @Get()
+ check() {
+ return { status: 'ok' };
+ }
+}
diff --git a/src/health/health.module.ts b/src/health/health.module.ts
new file mode 100644
index 0000000000..7476abedd4
--- /dev/null
+++ b/src/health/health.module.ts
@@ -0,0 +1,7 @@
+import { Module } from '@nestjs/common';
+import { HealthController } from './health.controller';
+
+@Module({
+ controllers: [HealthController],
+})
+export class HealthModule {}
diff --git a/src/main.ts b/src/main.ts
new file mode 100644
index 0000000000..54b2a9f1da
--- /dev/null
+++ b/src/main.ts
@@ -0,0 +1,18 @@
+import { ValidationPipe } from '@nestjs/common';
+import { NestFactory } from '@nestjs/core';
+import { AppModule } from './app.module';
+
+async function bootstrap() {
+ const app = await NestFactory.create(AppModule);
+ app.useGlobalPipes(
+ new ValidationPipe({
+ whitelist: true,
+ forbidNonWhitelisted: true,
+ transform: true,
+ }),
+ );
+
+ await app.listen(process.env.PORT || 3000);
+ console.log(`Application running on http://localhost:${process.env.PORT || 3000}`);
+}
+bootstrap();
diff --git a/src/notifications/notifications.module.ts b/src/notifications/notifications.module.ts
new file mode 100644
index 0000000000..d39393bf38
--- /dev/null
+++ b/src/notifications/notifications.module.ts
@@ -0,0 +1,11 @@
+import { Module } from '@nestjs/common';
+import { TypeOrmModule } from '@nestjs/typeorm';
+import { NotifyConsumerService } from './notify-consumer.service';
+import { NotificationEntity } from '../payments/entities/notification.entity';
+import { ConsumerProcessedEventEntity } from '../payments/entities/consumer-processed-event.entity';
+
+@Module({
+ imports: [TypeOrmModule.forFeature([NotificationEntity, ConsumerProcessedEventEntity])],
+ providers: [NotifyConsumerService],
+})
+export class NotificationsModule {}
diff --git a/src/notifications/notify-consumer.service.ts b/src/notifications/notify-consumer.service.ts
new file mode 100644
index 0000000000..0b748b28eb
--- /dev/null
+++ b/src/notifications/notify-consumer.service.ts
@@ -0,0 +1,85 @@
+import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
+import { ConfigService } from '@nestjs/config';
+import { InjectRepository } from '@nestjs/typeorm';
+import { Repository } from 'typeorm';
+import { randomUUID } from 'crypto';
+import { KafkaService } from '../common/kafka/kafka.service';
+import { CONSUMER_NAMES } from '../common/constants/consumer-names';
+import { TOPICS } from '../common/constants/topics';
+import { BaseEvent, PaymentFailedPayload, PaymentSettledPayload } from '../common/contracts/events';
+import { NotificationEntity } from '../payments/entities/notification.entity';
+import { ConsumerProcessedEventEntity } from '../payments/entities/consumer-processed-event.entity';
+
+@Injectable()
+export class NotifyConsumerService implements OnModuleInit {
+ private readonly logger = new Logger(NotifyConsumerService.name);
+
+ constructor(
+ private readonly configService: ConfigService,
+ private readonly kafkaService: KafkaService,
+ @InjectRepository(NotificationEntity)
+ private readonly notificationRepository: Repository,
+ @InjectRepository(ConsumerProcessedEventEntity)
+ private readonly processedRepository: Repository,
+ ) {}
+
+ async onModuleInit(): Promise {
+ const groupId = this.configService.get('KAFKA_GROUP_NOTIFY', 'payment-notify-group');
+ await this.kafkaService.subscribe({
+ groupId: `${groupId}-settled`,
+ topic: TOPICS.PAYMENT_SETTLED,
+ eachMessage: async ({ value }) => this.handleSettled(value as BaseEvent),
+ });
+ await this.kafkaService.subscribe({
+ groupId: `${groupId}-failed`,
+ topic: TOPICS.PAYMENT_FAILED,
+ eachMessage: async ({ value }) => this.handleFailed(value as BaseEvent),
+ });
+ }
+
+ private async alreadyProcessed(consumerName: string, eventId: string): Promise {
+ return Boolean(await this.processedRepository.findOne({ where: { consumerName, eventId } }));
+ }
+
+ private async markProcessed(consumerName: string, eventId: string, aggregateId: string): Promise {
+ await this.processedRepository.save(
+ this.processedRepository.create({ consumerName, eventId, aggregateId, processedAt: new Date() }),
+ );
+ }
+
+ private async handleSettled(event: BaseEvent): Promise {
+ if (await this.alreadyProcessed(CONSUMER_NAMES.NOTIFY_SETTLED, event.eventId)) {
+ return;
+ }
+
+ await this.notificationRepository.save(
+ this.notificationRepository.create({
+ id: randomUUID(),
+ paymentId: event.payload.paymentId,
+ channel: 'email',
+ status: 'sent',
+ payload: { message: `Payment ${event.payload.paymentId} settled successfully` },
+ }),
+ );
+ await this.markProcessed(CONSUMER_NAMES.NOTIFY_SETTLED, event.eventId, event.aggregateId);
+ this.logger.log(`Notification sent for settled payment ${event.payload.paymentId}`);
+ }
+
+ private async handleFailed(event: BaseEvent): Promise {
+ if (await this.alreadyProcessed(CONSUMER_NAMES.NOTIFY_FAILED, event.eventId)) {
+ return;
+ }
+
+ await this.notificationRepository.save(
+ this.notificationRepository.create({
+ id: randomUUID(),
+ paymentId: event.payload.paymentId,
+ channel: 'email',
+ status: 'sent',
+ payload: { message: `Payment ${event.payload.paymentId} failed: ${event.payload.reason}` },
+ }),
+ );
+ await this.markProcessed(CONSUMER_NAMES.NOTIFY_FAILED, event.eventId, event.aggregateId);
+ this.logger.log(`Notification sent for failed payment ${event.payload.paymentId}`);
+ }
+}
diff --git a/src/payments/dto/create-payment.dto.ts b/src/payments/dto/create-payment.dto.ts
new file mode 100644
index 0000000000..f7d55cfe74
--- /dev/null
+++ b/src/payments/dto/create-payment.dto.ts
@@ -0,0 +1,19 @@
+import { IsNotEmpty, IsNumber, IsPositive, IsString, Length } from 'class-validator';
+
+export class CreatePaymentDto {
+ @IsString()
+ @IsNotEmpty()
+ userId: string;
+
+ @IsString()
+ @Length(2, 8)
+ country: string;
+
+ @IsNumber()
+ @IsPositive()
+ amount: number;
+
+ @IsString()
+ @Length(3, 8)
+ currency: string;
+}
diff --git a/src/payments/dto/payment-response.dto.ts b/src/payments/dto/payment-response.dto.ts
new file mode 100644
index 0000000000..b72596a369
--- /dev/null
+++ b/src/payments/dto/payment-response.dto.ts
@@ -0,0 +1,8 @@
+export class PaymentResponseDto {
+ paymentId: string;
+ status: 'pending' | 'settled' | 'failed';
+ fraudStatus?: 'pending' | 'succeeded' | 'failed';
+ ledgerStatus?: 'pending' | 'succeeded' | 'failed';
+ failureReason?: string | null;
+ updatedAt: string;
+}
diff --git a/src/payments/entities/consumer-processed-event.entity.ts b/src/payments/entities/consumer-processed-event.entity.ts
new file mode 100644
index 0000000000..139b546928
--- /dev/null
+++ b/src/payments/entities/consumer-processed-event.entity.ts
@@ -0,0 +1,16 @@
+import { Column, Entity, PrimaryColumn } from 'typeorm';
+
+@Entity('consumer_processed_events')
+export class ConsumerProcessedEventEntity {
+ @PrimaryColumn({ name: 'consumer_name', type: 'varchar', length: 100 })
+ consumerName: string;
+
+ @PrimaryColumn({ name: 'event_id', type: 'uuid' })
+ eventId: string;
+
+ @Column({ name: 'aggregate_id', type: 'uuid' })
+ aggregateId: string;
+
+ @Column({ name: 'processed_at', type: 'timestamp', default: () => 'NOW()' })
+ processedAt: Date;
+}
diff --git a/src/payments/entities/ledger-entry.entity.ts b/src/payments/entities/ledger-entry.entity.ts
new file mode 100644
index 0000000000..e9fdfd48cd
--- /dev/null
+++ b/src/payments/entities/ledger-entry.entity.ts
@@ -0,0 +1,25 @@
+import { Column, CreateDateColumn, Entity, PrimaryColumn } from 'typeorm';
+
+@Entity('ledger_entries')
+export class LedgerEntryEntity {
+ @PrimaryColumn('uuid')
+ id: string;
+
+ @Column({ name: 'payment_id', type: 'uuid' })
+ paymentId: string;
+
+ @Column({ name: 'debit_account', type: 'varchar', length: 100 })
+ debitAccount: string;
+
+ @Column({ name: 'credit_account', type: 'varchar', length: 100 })
+ creditAccount: string;
+
+ @Column({ type: 'numeric', precision: 18, scale: 2 })
+ amount: string;
+
+ @Column({ type: 'varchar', length: 8 })
+ currency: string;
+
+ @CreateDateColumn({ name: 'created_at' })
+ createdAt: Date;
+}
diff --git a/src/payments/entities/notification.entity.ts b/src/payments/entities/notification.entity.ts
new file mode 100644
index 0000000000..c27708d5a4
--- /dev/null
+++ b/src/payments/entities/notification.entity.ts
@@ -0,0 +1,22 @@
+import { Column, CreateDateColumn, Entity, PrimaryColumn } from 'typeorm';
+
+@Entity('notifications')
+export class NotificationEntity {
+ @PrimaryColumn('uuid')
+ id: string;
+
+ @Column({ name: 'payment_id', type: 'uuid' })
+ paymentId: string;
+
+ @Column({ type: 'varchar', length: 20 })
+ channel: string;
+
+ @Column({ type: 'varchar', length: 20 })
+ status: string;
+
+ @Column({ type: 'jsonb' })
+ payload: Record;
+
+ @CreateDateColumn({ name: 'created_at' })
+ createdAt: Date;
+}
diff --git a/src/payments/entities/outbox-event.entity.ts b/src/payments/entities/outbox-event.entity.ts
new file mode 100644
index 0000000000..f6a6a82434
--- /dev/null
+++ b/src/payments/entities/outbox-event.entity.ts
@@ -0,0 +1,46 @@
+import { Column, CreateDateColumn, Entity, PrimaryColumn } from 'typeorm';
+
+@Entity('outbox_events')
+export class OutboxEventEntity {
+ @PrimaryColumn('uuid')
+ id: string;
+
+ @Column({ name: 'aggregate_id', type: 'uuid' })
+ aggregateId: string;
+
+ @Column({ name: 'aggregate_type', type: 'varchar', length: 50 })
+ aggregateType: string;
+
+ @Column({ type: 'varchar', length: 255 })
+ topic: string;
+
+ @Column({ name: 'event_type', type: 'varchar', length: 255 })
+ eventType: string;
+
+ @Column({ name: 'event_version', type: 'int' })
+ eventVersion: number;
+
+ @Column({ type: 'jsonb' })
+ payload: Record;
+
+ @Column({ type: 'jsonb', default: {} })
+ headers: Record;
+
+ @Column({ type: 'varchar', length: 20, default: 'PENDING' })
+ status: string;
+
+ @Column({ name: 'retry_count', type: 'int', default: 0 })
+ retryCount: number;
+
+ @Column({ name: 'next_retry_at', type: 'timestamp', nullable: true })
+ nextRetryAt: Date | null;
+
+ @Column({ name: 'published_at', type: 'timestamp', nullable: true })
+ publishedAt: Date | null;
+
+ @Column({ name: 'last_error', type: 'text', nullable: true })
+ lastError: string | null;
+
+ @CreateDateColumn({ name: 'created_at' })
+ createdAt: Date;
+}
diff --git a/src/payments/entities/payment-processing-status.entity.ts b/src/payments/entities/payment-processing-status.entity.ts
new file mode 100644
index 0000000000..5631cc7f98
--- /dev/null
+++ b/src/payments/entities/payment-processing-status.entity.ts
@@ -0,0 +1,22 @@
+import { Column, Entity, PrimaryColumn, UpdateDateColumn } from 'typeorm';
+
+@Entity('payment_processing_status')
+export class PaymentProcessingStatusEntity {
+ @PrimaryColumn('uuid', { name: 'payment_id' })
+ paymentId: string;
+
+ @Column({ name: 'fraud_status', type: 'varchar', length: 16, default: 'pending' })
+ fraudStatus: string;
+
+ @Column({ name: 'ledger_status', type: 'varchar', length: 16, default: 'pending' })
+ ledgerStatus: string;
+
+ @Column({ name: 'final_status', type: 'varchar', length: 16, default: 'pending' })
+ finalStatus: string;
+
+ @Column({ name: 'failure_reason', type: 'text', nullable: true })
+ failureReason: string | null;
+
+ @UpdateDateColumn({ name: 'updated_at' })
+ updatedAt: Date;
+}
diff --git a/src/payments/entities/payment.entity.ts b/src/payments/entities/payment.entity.ts
new file mode 100644
index 0000000000..71d3c60264
--- /dev/null
+++ b/src/payments/entities/payment.entity.ts
@@ -0,0 +1,28 @@
+import { Column, CreateDateColumn, Entity, PrimaryColumn, UpdateDateColumn } from 'typeorm';
+
+@Entity('payments')
+export class PaymentEntity {
+ @PrimaryColumn('uuid')
+ id: string;
+
+ @Column({ name: 'user_id', type: 'varchar', length: 64 })
+ userId: string;
+
+ @Column({ type: 'varchar', length: 8 })
+ country: string;
+
+ @Column({ type: 'numeric', precision: 18, scale: 2 })
+ amount: string;
+
+ @Column({ type: 'varchar', length: 8 })
+ currency: string;
+
+ @Column({ type: 'varchar', length: 16, default: 'pending' })
+ status: string;
+
+ @CreateDateColumn({ name: 'created_at' })
+ createdAt: Date;
+
+ @UpdateDateColumn({ name: 'updated_at' })
+ updatedAt: Date;
+}
diff --git a/src/payments/payments.controller.ts b/src/payments/payments.controller.ts
new file mode 100644
index 0000000000..df854dfbc8
--- /dev/null
+++ b/src/payments/payments.controller.ts
@@ -0,0 +1,19 @@
+import { Body, Controller, Get, Param, Post } from '@nestjs/common';
+import { CreatePaymentDto } from './dto/create-payment.dto';
+import { PaymentResponseDto } from './dto/payment-response.dto';
+import { PaymentService } from './payments.service';
+
+@Controller('payments')
+export class PaymentsController {
+ constructor(private readonly paymentService: PaymentService) {}
+
+ @Post()
+ async createPayment(@Body() dto: CreatePaymentDto): Promise {
+ return this.paymentService.createPayment(dto);
+ }
+
+ @Get(':id')
+ async getPaymentById(@Param('id') id: string): Promise {
+ return this.paymentService.getPaymentById(id);
+ }
+}
diff --git a/src/payments/payments.module.ts b/src/payments/payments.module.ts
new file mode 100644
index 0000000000..1f980329de
--- /dev/null
+++ b/src/payments/payments.module.ts
@@ -0,0 +1,15 @@
+import { Module } from '@nestjs/common';
+import { TypeOrmModule } from '@nestjs/typeorm';
+import { PaymentsController } from './payments.controller';
+import { PaymentService } from './payments.service';
+import { PaymentEntity } from './entities/payment.entity';
+import { OutboxEventEntity } from './entities/outbox-event.entity';
+import { PaymentProcessingStatusEntity } from './entities/payment-processing-status.entity';
+
+@Module({
+ imports: [TypeOrmModule.forFeature([PaymentEntity, OutboxEventEntity, PaymentProcessingStatusEntity])],
+ controllers: [PaymentsController],
+ providers: [PaymentService],
+ exports: [PaymentService],
+})
+export class PaymentsModule {}
diff --git a/src/payments/payments.service.ts b/src/payments/payments.service.ts
new file mode 100644
index 0000000000..f85ef632a4
--- /dev/null
+++ b/src/payments/payments.service.ts
@@ -0,0 +1,109 @@
+import { Injectable, NotFoundException } from '@nestjs/common';
+import { InjectRepository } from '@nestjs/typeorm';
+import { DataSource, Repository } from 'typeorm';
+import { randomUUID } from 'crypto';
+import { CreatePaymentDto } from './dto/create-payment.dto';
+import { PaymentResponseDto } from './dto/payment-response.dto';
+import { PaymentEntity } from './entities/payment.entity';
+import { OutboxEventEntity } from './entities/outbox-event.entity';
+import { PaymentProcessingStatusEntity } from './entities/payment-processing-status.entity';
+import { TOPICS } from '../common/constants/topics';
+import { BaseEvent, PaymentCreatedPayload } from '../common/contracts/events';
+
+@Injectable()
+export class PaymentService {
+ constructor(
+ private readonly dataSource: DataSource,
+ @InjectRepository(PaymentEntity)
+ private readonly paymentRepository: Repository,
+ @InjectRepository(PaymentProcessingStatusEntity)
+ private readonly processingStatusRepository: Repository,
+ ) {}
+
+ async createPayment(dto: CreatePaymentDto): Promise {
+ const paymentId = randomUUID();
+ const eventId = randomUUID();
+ const now = new Date();
+
+ await this.dataSource.transaction(async (manager) => {
+ const payment = manager.create(PaymentEntity, {
+ id: paymentId,
+ userId: dto.userId,
+ country: dto.country,
+ amount: dto.amount.toFixed(2),
+ currency: dto.currency,
+ status: 'pending',
+ });
+ await manager.save(payment);
+
+ const processingStatus = manager.create(PaymentProcessingStatusEntity, {
+ paymentId,
+ fraudStatus: 'pending',
+ ledgerStatus: 'pending',
+ finalStatus: 'pending',
+ failureReason: null,
+ });
+ await manager.save(processingStatus);
+
+ const event: BaseEvent = {
+ eventId,
+ eventType: 'payment.created',
+ eventVersion: 1,
+ occurredAt: now.toISOString(),
+ correlationId: eventId,
+ aggregateId: paymentId,
+ aggregateType: 'payment',
+ country: dto.country,
+ payload: {
+ paymentId,
+ userId: dto.userId,
+ amount: dto.amount,
+ currency: dto.currency,
+ country: dto.country,
+ createdAt: now.toISOString(),
+ },
+ };
+
+ const outbox = manager.create(OutboxEventEntity, {
+ id: randomUUID(),
+ aggregateId: paymentId,
+ aggregateType: 'payment',
+ topic: TOPICS.PAYMENT_CREATED,
+ eventType: 'payment.created',
+ eventVersion: 1,
+ payload: event as unknown as Record,
+ headers: { correlationId: event.correlationId },
+ status: 'PENDING',
+ retryCount: 0,
+ nextRetryAt: null,
+ publishedAt: null,
+ lastError: null,
+ });
+ await manager.save(outbox);
+ });
+
+ return {
+ paymentId,
+ status: 'pending',
+ updatedAt: now.toISOString(),
+ };
+ }
+
+ async getPaymentById(paymentId: string): Promise {
+ const payment = await this.paymentRepository.findOne({ where: { id: paymentId } });
+ if (!payment) {
+ throw new NotFoundException(`Payment ${paymentId} not found`);
+ }
+
+ const processing = await this.processingStatusRepository.findOne({ where: { paymentId } });
+
+ return {
+ paymentId: payment.id,
+ status: payment.status as 'pending' | 'settled' | 'failed',
+ fraudStatus: processing?.fraudStatus as 'pending' | 'succeeded' | 'failed',
+ ledgerStatus: processing?.ledgerStatus as 'pending' | 'succeeded' | 'failed',
+ failureReason: processing?.failureReason ?? null,
+ updatedAt: payment.updatedAt.toISOString(),
+ };
+ }
+}
diff --git a/src/relay/outbox-relay.service.ts b/src/relay/outbox-relay.service.ts
new file mode 100644
index 0000000000..b2c01ded0d
--- /dev/null
+++ b/src/relay/outbox-relay.service.ts
@@ -0,0 +1,76 @@
+import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
+import { ConfigService } from '@nestjs/config';
+import { InjectRepository } from '@nestjs/typeorm';
+import { LessThanOrEqual, Repository } from 'typeorm';
+import { OutboxEventEntity } from '../payments/entities/outbox-event.entity';
+import { KafkaService } from '../common/kafka/kafka.service';
+
+@Injectable()
+export class OutboxRelayService implements OnModuleInit {
+ private readonly logger = new Logger(OutboxRelayService.name);
+ private isProcessing = false;
+
+ constructor(
+ private readonly configService: ConfigService,
+ @InjectRepository(OutboxEventEntity)
+ private readonly outboxRepository: Repository,
+ private readonly kafkaService: KafkaService,
+ ) {}
+
+ onModuleInit(): void {
+ const interval = Number(this.configService.get('OUTBOX_POLL_INTERVAL_MS', 5000));
+ setInterval(() => {
+ void this.processPendingEvents();
+ }, interval);
+ }
+
+ async processPendingEvents(): Promise {
+ if (this.isProcessing) {
+ return;
+ }
+
+ this.isProcessing = true;
+
+ try {
+ const batchSize = Number(this.configService.get('OUTBOX_BATCH_SIZE', 50));
+ const events = await this.outboxRepository.find({
+ where: [
+ { status: 'PENDING' },
+ { status: 'FAILED', nextRetryAt: LessThanOrEqual(new Date()) },
+ ],
+ order: { createdAt: 'ASC' },
+ take: batchSize,
+ });
+
+ if (!events.length) {
+ return;
+ }
+
+ this.logger.log(`Found ${events.length} outbox event(s) to process`);
+
+ for (const event of events) {
+ try {
+ await this.kafkaService.publish(event.topic, event.aggregateId, event.payload, event.headers);
+ event.status = 'PUBLISHED';
+ event.publishedAt = new Date();
+ event.lastError = null;
+ event.nextRetryAt = null;
+ await this.outboxRepository.save(event);
+ this.logger.log(`Published outbox event ${event.id} to topic ${event.topic}`);
+ } catch (error) {
+ const retryCount = event.retryCount + 1;
+ event.retryCount = retryCount;
+ event.status = 'FAILED';
+ event.lastError = error instanceof Error ? error.message : 'Unknown publish error';
+ event.nextRetryAt = new Date(Date.now() + Math.min(retryCount * 5000, 60000));
+ await this.outboxRepository.save(event);
+ this.logger.error(
+ `Failed publishing outbox event ${event.id}. Retry count: ${retryCount}. Error: ${event.lastError}`,
+ );
+ }
+ }
+ } finally {
+ this.isProcessing = false;
+ }
+ }
+}
diff --git a/src/relay/relay.module.ts b/src/relay/relay.module.ts
new file mode 100644
index 0000000000..87a1acf749
--- /dev/null
+++ b/src/relay/relay.module.ts
@@ -0,0 +1,11 @@
+import { Module } from '@nestjs/common';
+import { TypeOrmModule } from '@nestjs/typeorm';
+import { OutboxEventEntity } from '../payments/entities/outbox-event.entity';
+import { OutboxRelayService } from './outbox-relay.service';
+
+@Module({
+ imports: [TypeOrmModule.forFeature([OutboxEventEntity])],
+ providers: [OutboxRelayService],
+ exports: [OutboxRelayService],
+})
+export class RelayModule {}
diff --git a/tsconfig.build.json b/tsconfig.build.json
new file mode 100644
index 0000000000..64f86c6bd2
--- /dev/null
+++ b/tsconfig.build.json
@@ -0,0 +1,4 @@
+{
+ "extends": "./tsconfig.json",
+ "exclude": ["node_modules", "test", "dist", "**/*spec.ts"]
+}
diff --git a/tsconfig.json b/tsconfig.json
new file mode 100644
index 0000000000..47b67cebc4
--- /dev/null
+++ b/tsconfig.json
@@ -0,0 +1,21 @@
+{
+ "compilerOptions": {
+ "module": "commonjs",
+ "declaration": true,
+ "removeComments": true,
+ "emitDecoratorMetadata": true,
+ "experimentalDecorators": true,
+ "allowSyntheticDefaultImports": true,
+ "target": "ES2021",
+ "sourceMap": true,
+ "outDir": "./dist",
+ "baseUrl": "./",
+ "incremental": true,
+ "skipLibCheck": true,
+ "strict": true,
+ "moduleResolution": "node",
+ "esModuleInterop": true
+ },
+ "include": ["src/**/*.ts"],
+ "exclude": ["node_modules", "dist"]
+}