Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
**/dist
**/node_modules
**/.git
36 changes: 36 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Kafka
KAFKA_BROKERS=localhost:9092
KAFKA_CLIENT_ID=wallet-transfer-saga
KAFKA_CONSUMER_GROUP_ORCHESTRATOR=orchestrator-group
KAFKA_CONSUMER_GROUP_WALLET=wallet-service-group
KAFKA_CONSUMER_GROUP_FX=fx-service-group
KAFKA_CONSUMER_GROUP_RECEIPT=receipt-service-group
KAFKA_CONSUMER_GROUP_QUERY=query-service-group

# Kafka interna (inter-broker dentro de docker-compose)
KAFKA_INTERNAL_PORT=29092
KAFKA_EXTERNAL_PORT=9092
ZOOKEEPER_PORT=2181

# PostgreSQL
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_USER=saga_user
POSTGRES_PASSWORD=saga_pass
POSTGRES_DB=wallet_saga

# Servicios — puertos HTTP
ORCHESTRATOR_PORT=3001
QUERY_SERVICE_PORT=3005

# FX mock
FX_MOCK_PORT=4000
FX_MOCK_DEFAULT_DELAY_MS=300
FX_MOCK_DEFAULT_RATE=3.72

# FX service — timeout en ms para considerar estado ambiguo
FX_API_URL=http://fx-mock:4000
FX_TIMEOUT_MS=5000

# Money — unidad mínima (centavos). Cambiar solo si la moneda usa otra subdivisión.
MONEY_SUBUNIT_FACTOR=100
27 changes: 27 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Dependencies
node_modules/

# Build output
dist/
**/dist/

# Environment files — nunca subir secrets reales
.env
.env.local
.env.production

# Logs
*.log
npm-debug.log*

# OS
.DS_Store
Thumbs.db

# IDE
.vscode/
.idea/

# Jest cache
coverage/
.jest-cache/
434 changes: 434 additions & 0 deletions ADR.md

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions Dockerfile.base
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM node:22-alpine AS base
WORKDIR /app
COPY package.json package-lock.json tsconfig.json ./
COPY apps/orchestrator/package.json ./apps/orchestrator/
COPY apps/wallet-service/package.json ./apps/wallet-service/
COPY apps/fx-service/package.json ./apps/fx-service/
COPY apps/receipt-service/package.json ./apps/receipt-service/
COPY apps/query-service/package.json ./apps/query-service/
RUN npm ci --workspaces --include-workspace-root
COPY libs/ ./libs/
COPY apps/ ./apps/
425 changes: 303 additions & 122 deletions README.md

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions apps/fx-service/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
FROM wallet-transfer-saga-base AS builder
WORKDIR /app/apps/fx-service
RUN ../../node_modules/.bin/nest build

FROM node:22-alpine
WORKDIR /app
COPY --from=builder /app/apps/fx-service/dist/main.js ./main.js
COPY --from=builder /app/node_modules ./node_modules
CMD ["node", "main.js"]
9 changes: 9 additions & 0 deletions apps/fx-service/nest-cli.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"collection": "@nestjs/schematics",
"sourceRoot": "src",
"compilerOptions": {
"webpack": true,
"webpackConfigPath": "webpack.config.js",
"tsConfigPath": "tsconfig.json"
}
}
26 changes: 26 additions & 0 deletions apps/fx-service/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"name": "@saga/fx-service",
"version": "1.0.0",
"scripts": {
"build": "nest build",
"start": "node dist/main",
"start:dev": "nest start --watch",
"test": "jest"
},
"dependencies": {
"@nestjs/common": "^11.1.18",
"@nestjs/core": "^11.1.18",
"@nestjs/microservices": "^11.1.18",
"kafkajs": "^2.2.4",
"reflect-metadata": "^0.2.2",
"rxjs": "^7.8.1",
"uuid": "^11.0.0"
},
"devDependencies": {
"@nestjs/cli": "^11.0.0",
"@nestjs/testing": "^11.1.18",
"@types/uuid": "^10.0.0",
"jest": "^29.7.0",
"ts-jest": "^29.2.0"
}
}
28 changes: 28 additions & 0 deletions apps/fx-service/src/app.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { FxModule } from './fx/fx.module';
import { SettleFxConsumer } from './consumers/settle-fx.consumer';

@Module({
imports: [
ClientsModule.register([
{
name: 'KAFKA_CLIENT',
transport: Transport.KAFKA,
options: {
client: {
clientId: process.env.KAFKA_CLIENT_ID,
brokers: (process.env.KAFKA_BROKERS ?? '').split(','),
},
consumer: {
groupId: process.env.KAFKA_CONSUMER_GROUP_FX ?? 'fx-service-group',
},
producerOnlyMode: true,
},
},
]),
FxModule,
],
controllers: [SettleFxConsumer],
})
export class AppModule {}
60 changes: 60 additions & 0 deletions apps/fx-service/src/consumers/settle-fx.consumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { Controller, Inject, Logger, OnModuleInit } from '@nestjs/common';
import { EventPattern, Payload, ClientKafka } from '@nestjs/microservices';
import { firstValueFrom } from 'rxjs';
import { KafkaTopic } from '@contracts/topics';
import { SettleFxCommand } from '@contracts/commands';
import { FxSettledEvent, FxFailedEvent, FxAmbiguousEvent } from '@contracts/events';
import { FxService } from '../fx/fx.service';

@Controller()
export class SettleFxConsumer implements OnModuleInit {
private readonly logger = new Logger(SettleFxConsumer.name);

constructor(
private readonly fxService: FxService,
@Inject('KAFKA_CLIENT') private readonly kafka: ClientKafka,
) {}

async onModuleInit(): Promise<void> {
await this.kafka.connect();
}

@EventPattern(KafkaTopic.SETTLE_FX)
async handle(@Payload() cmd: SettleFxCommand): Promise<void> {
const result = await this.fxService.settle(
cmd.sagaId,
cmd.fromCurrency,
cmd.toCurrency,
);

if (result.status === 'settled') {
const event: FxSettledEvent = {
sagaId: cmd.sagaId,
fromCurrency: cmd.fromCurrency,
toCurrency: cmd.toCurrency,
rate: result.rate,
settledAt: result.settledAt,
};
await firstValueFrom(this.kafka.emit(KafkaTopic.FX_SETTLED, event));
return;
}

if (result.status === 'ambiguous') {
const event: FxAmbiguousEvent = {
sagaId: cmd.sagaId,
timeoutMs: result.timeoutMs,
attemptedAt: result.attemptedAt,
};
await firstValueFrom(this.kafka.emit(KafkaTopic.FX_AMBIGUOUS, event));
this.logger.error(`FX ambiguous for saga ${cmd.sagaId} — timeout after ${result.timeoutMs}ms`);
return;
}

const event: FxFailedEvent = {
sagaId: cmd.sagaId,
reason: result.reason,
};
await firstValueFrom(this.kafka.emit(KafkaTopic.FX_FAILED, event));
this.logger.warn(`FX failed for saga ${cmd.sagaId}: ${result.reason}`);
}
}
44 changes: 44 additions & 0 deletions apps/fx-service/src/fx/fx-rate.provider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { Injectable, Logger } from '@nestjs/common';

export interface FxRateResponse {
rate: number;
settled: boolean;
}

@Injectable()
export class FxRateProvider {
private readonly logger = new Logger(FxRateProvider.name);
private readonly apiUrl = process.env.FX_API_URL;
private readonly timeoutMs = parseInt(process.env.FX_TIMEOUT_MS ?? '5000');

async getRate(fromCurrency: string, toCurrency: string): Promise<FxRateResponse> {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), this.timeoutMs);

try {
const url = `${this.apiUrl}?from=${fromCurrency}&to=${toCurrency}`;
const response = await fetch(url, { signal: controller.signal });

if (!response.ok) {
throw new Error(`FX API responded with status ${response.status}`);
}

return response.json() as Promise<FxRateResponse>;
} catch (error) {
if (error instanceof Error && error.name === 'AbortError') {
this.logger.error(`FX API timed out after ${this.timeoutMs}ms`);
throw new FxTimeoutError(this.timeoutMs);
}
throw error;
} finally {
clearTimeout(timeoutId);
}
}
}

export class FxTimeoutError extends Error {
constructor(public readonly timeoutMs: number) {
super(`FX API timed out after ${timeoutMs}ms`);
this.name = 'FxTimeoutError';
}
}
9 changes: 9 additions & 0 deletions apps/fx-service/src/fx/fx.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { Module } from '@nestjs/common';
import { FxRateProvider } from './fx-rate.provider';
import { FxService } from './fx.service';

@Module({
providers: [FxRateProvider, FxService],
exports: [FxService],
})
export class FxModule {}
51 changes: 51 additions & 0 deletions apps/fx-service/src/fx/fx.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { Injectable, Logger } from '@nestjs/common';
import { FxRateProvider, FxTimeoutError } from './fx-rate.provider';
import { buildIdempotencyKey } from '@helpers/idempotency-key.helper';

export type FxSettleResult =
| { status: 'settled'; rate: number; settledAt: string }
| { status: 'failed'; reason: string }
| { status: 'ambiguous'; timeoutMs: number; attemptedAt: string };

@Injectable()
export class FxService {
private readonly logger = new Logger(FxService.name);

// In-memory deduplication — evita llamar dos veces a la API externa por el mismo sagaId.
// En producción reemplazar con Redis o tabla DB para sobrevivir reinicios del servicio.
private readonly processed = new Map<string, FxSettleResult>();

constructor(private readonly fxRateProvider: FxRateProvider) {}

async settle(sagaId: string, fromCurrency: string, toCurrency: string): Promise<FxSettleResult> {
const idempotencyKey = buildIdempotencyKey(sagaId, 'fx-settle');

const cached = this.processed.get(idempotencyKey);
if (cached) {
this.logger.log(`FX settle ${idempotencyKey} already processed — returning cached result`);
return cached;
}

const result = await this.callProvider(fromCurrency, toCurrency);
this.processed.set(idempotencyKey, result);
return result;
}

private async callProvider(fromCurrency: string, toCurrency: string): Promise<FxSettleResult> {
try {
const { rate } = await this.fxRateProvider.getRate(fromCurrency, toCurrency);
return { status: 'settled', rate, settledAt: new Date().toISOString() };
} catch (error) {
if (error instanceof FxTimeoutError) {
return {
status: 'ambiguous',
timeoutMs: error.timeoutMs,
attemptedAt: new Date().toISOString(),
};
}
const reason = error instanceof Error ? error.message : 'UNKNOWN';
this.logger.error(`FX provider error: ${reason}`);
return { status: 'failed', reason };
}
}
}
29 changes: 29 additions & 0 deletions apps/fx-service/src/main.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { AppModule } from './app.module';
import { KafkaExceptionFilter } from '@common/filters/kafka-exception.filter';

async function bootstrap(): Promise<void> {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.KAFKA,
options: {
client: {
clientId: process.env.KAFKA_CLIENT_ID,
brokers: (process.env.KAFKA_BROKERS ?? '').split(','),
retry: { retries: 10, initialRetryTime: 300 },
},
consumer: {
groupId: process.env.KAFKA_CONSUMER_GROUP_FX ?? 'fx-service-group',
},
},
},
);

app.enableShutdownHooks(); // drain in-flight messages before SIGTERM kills the process
app.useGlobalFilters(new KafkaExceptionFilter());
await app.listen();
}

bootstrap();
Loading