Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Dependencies
node_modules/

# Build output
dist/
build/
*.tsbuildinfo

# Test output
coverage/
.nyc_output/

# Logs
logs/
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*
pnpm-debug.log*

# Environment
.env
.env.*
!.env.example

# OS / Editor
.DS_Store
Thumbs.db
.idea/
.vscode/

# Docker / local runtime
.docker/
tmp/
temp/

# Prisma local artifacts
packages/db/prisma/dev.db
packages/db/prisma/dev.db-journal

381 changes: 194 additions & 187 deletions README.md

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions apps/fraud-worker/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
FROM node:20-alpine
WORKDIR /app

RUN corepack enable

COPY package.json pnpm-workspace.yaml tsconfig.base.json tsconfig.json jest.config.ts ./
COPY apps ./apps
COPY packages ./packages

RUN pnpm install --no-frozen-lockfile
RUN pnpm prisma:generate

CMD ["pnpm", "start:fraud-worker"]
40 changes: 40 additions & 0 deletions apps/fraud-worker/src/dlt.publisher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
import { Kafka, Producer } from 'kafkajs';
import type { PaymentFailedV1 } from '@contracts/payment-failed.v1';
import type { PaymentSettledV1 } from '@contracts/payment-settled.v1';
import { TOPIC_NAMES } from '@shared/topic-names';

@Injectable()
export class DltPublisher implements OnModuleInit, OnModuleDestroy {
private readonly producer: Producer;

constructor() {
const kafka = new Kafka({
clientId: 'fraud-worker-publisher',
brokers: (process.env.KAFKA_BROKERS ?? 'localhost:9092').split(',')
});
this.producer = kafka.producer();
}

async onModuleInit(): Promise<void> {
await this.producer.connect();
}

async onModuleDestroy(): Promise<void> {
await this.producer.disconnect();
}

async publishFailed(event: PaymentFailedV1): Promise<void> {
await this.producer.send({
topic: TOPIC_NAMES.paymentFailedV1,
messages: [{ value: JSON.stringify(event) }]
});
}

async publishSettled(event: PaymentSettledV1): Promise<void> {
await this.producer.send({
topic: TOPIC_NAMES.paymentSettledV1,
messages: [{ value: JSON.stringify(event) }]
});
}
}
121 changes: 121 additions & 0 deletions apps/fraud-worker/src/fraud.consumer.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import { FraudConsumer } from './fraud.consumer';
import type { PaymentCreatedV1 } from '@contracts/payment-created.v1';

describe('FraudConsumer idempotency', () => {
it('does not produce duplicate side effects for the same eventId', async () => {
const envelope: PaymentCreatedV1 = {
eventId: 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa',
eventType: 'payment.created.v1',
occurredAt: new Date().toISOString(),
aggregateId: 'bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb',
traceId: 'trace-1',
payload: {
countryCode: 'PE',
amount: 99,
currency: 'PEN'
}
};

const prisma = {
$transaction: jest.fn(async (cb: (tx: unknown) => Promise<unknown>) => cb({}))
};

const fraudRepository = {
createResultAndAdvancePayment: jest.fn(async () => ({ settled: false })),
markPaymentFailed: jest.fn(async () => undefined)
};

const fraudService = {
evaluate: jest.fn(() => ({ decision: 'approve', riskLevel: 'low' }))
};

const processedEventsRepository = {
register: jest
.fn<Promise<boolean>, [unknown, string, string]>()
.mockResolvedValueOnce(true)
.mockResolvedValueOnce(false)
};

const dltPublisher = {
publishFailed: jest.fn(async () => undefined),
publishSettled: jest.fn(async () => undefined)
};

const consumer = new FraudConsumer(
prisma as never,
fraudRepository as never,
fraudService as never,
processedEventsRepository as never,
dltPublisher as never
);

await consumer.processEnvelope(envelope);
await consumer.processEnvelope(envelope);

expect(processedEventsRepository.register).toHaveBeenCalledTimes(2);
expect(fraudRepository.createResultAndAdvancePayment).toHaveBeenCalledTimes(1);
expect(dltPublisher.publishFailed).not.toHaveBeenCalled();
});

it('registers reject decision with reason for high-value payment and publishes failed event', async () => {
const envelope: PaymentCreatedV1 = {
eventId: 'eeeeeeee-eeee-eeee-eeee-eeeeeeeeeeee',
eventType: 'payment.created.v1',
occurredAt: new Date().toISOString(),
aggregateId: 'ffffffff-ffff-ffff-ffff-ffffffffffff',
traceId: 'trace-2',
payload: {
countryCode: 'PE',
amount: 10000,
currency: 'PEN'
}
};

const prisma = {
$transaction: jest.fn(async (cb: (tx: unknown) => Promise<unknown>) => cb({}))
};

const fraudRepository = {
createResultAndAdvancePayment: jest.fn(async () => ({ settled: false })),
createRejectedResultAndMarkPaymentFailed: jest.fn(async () => undefined),
markPaymentFailed: jest.fn(async () => undefined)
};

const fraudService = {
evaluate: jest.fn(() => ({
decision: 'reject' as const,
riskLevel: 'high' as const,
reason: 'Fraud engine failed for high-value payment'
}))
};

const processedEventsRepository = {
register: jest.fn<Promise<boolean>, [unknown, string, string]>().mockResolvedValue(true)
};

const dltPublisher = {
publishFailed: jest.fn(async () => undefined),
publishSettled: jest.fn(async () => undefined)
};

const consumer = new FraudConsumer(
prisma as never,
fraudRepository as never,
fraudService as never,
processedEventsRepository as never,
dltPublisher as never
);

await consumer.processEnvelope(envelope);

expect(fraudRepository.createRejectedResultAndMarkPaymentFailed).toHaveBeenCalledWith(
expect.anything(),
{
paymentId: envelope.aggregateId,
reason: 'Fraud engine failed for high-value payment'
}
);
expect(fraudRepository.createResultAndAdvancePayment).not.toHaveBeenCalled();
expect(dltPublisher.publishFailed).toHaveBeenCalledTimes(1);
});
});
150 changes: 150 additions & 0 deletions apps/fraud-worker/src/fraud.consumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import { randomUUID } from 'crypto';
import { Inject, Injectable } from '@nestjs/common';
import { PrismaClient } from '@prisma/client';
import type { PaymentCreatedV1 } from '@contracts/payment-created.v1';
import type { PaymentFailedV1 } from '@contracts/payment-failed.v1';
import type { PaymentSettledV1 } from '@contracts/payment-settled.v1';
import { TOPIC_NAMES } from '@shared/topic-names';
import { newTraceId } from '@shared/correlation';
import { MAX_RETRIES, RETRY_DELAY_MS, sleep } from '@shared/retry-policy';
import { createLogger } from '@shared/logger';
import { FraudRepository } from './fraud.repository';
import { FraudService } from './fraud.service';
import { ProcessedEventsRepository } from './processed-events.repository';
import { DltPublisher } from './dlt.publisher';

@Injectable()
export class FraudConsumer {
private readonly logger = createLogger(FraudConsumer.name);
private static readonly consumerName = 'fraud-worker';

constructor(
@Inject(PrismaClient) private readonly prisma: PrismaClient,
@Inject(FraudRepository)
private readonly fraudRepository: FraudRepository,
@Inject(FraudService) private readonly fraudService: FraudService,
@Inject(ProcessedEventsRepository)
private readonly processedEventsRepository: ProcessedEventsRepository,
@Inject(DltPublisher) private readonly dltPublisher: DltPublisher
) {}

async processEnvelope(envelope: PaymentCreatedV1): Promise<void> {
type ProcessingResult =
| { duplicate: true; settled: false; rejected: false; rejectReason: null }
| { duplicate: false; settled: boolean; rejected: false; rejectReason: null }
| { duplicate: false; settled: false; rejected: true; rejectReason: string };

for (let attempt = 1; attempt <= MAX_RETRIES; attempt += 1) {
try {
const result: ProcessingResult = await this.prisma.$transaction(async (tx) => {
const isNew = await this.processedEventsRepository.register(
tx,
FraudConsumer.consumerName,
envelope.eventId
);

if (!isNew) {
return { duplicate: true, settled: false, rejected: false, rejectReason: null };
}

const evaluated = this.fraudService.evaluate(envelope.payload.amount);

if (evaluated.decision === 'reject') {
await this.fraudRepository.createRejectedResultAndMarkPaymentFailed(tx, {
paymentId: envelope.aggregateId,
reason: evaluated.reason
});

return {
duplicate: false,
settled: false,
rejected: true,
rejectReason: evaluated.reason
};
}

const advance = await this.fraudRepository.createResultAndAdvancePayment(tx, {
paymentId: envelope.aggregateId,
decision: evaluated.decision,
riskLevel: evaluated.riskLevel
});

return {
duplicate: false,
settled: advance.settled,
rejected: false,
rejectReason: null
};
});

if (result.duplicate) {
this.logger.log(`Skipping duplicated event ${envelope.eventId}`);
return;
}

if (result.rejected) {
await this.publishFailed(envelope.aggregateId, envelope.traceId, result.rejectReason);
this.logger.log(
`Rejected payment ${envelope.aggregateId} from event ${envelope.eventId}: ${result.rejectReason}`
);
return;
}

this.logger.log(
`Processed new event ${envelope.eventId} for payment ${envelope.aggregateId} successfully`
);

if (result.settled) {
await this.publishSettled(envelope.aggregateId, envelope.traceId);
}

return;
} catch (error) {
if (attempt < MAX_RETRIES) {
await sleep(RETRY_DELAY_MS);
continue;
}

await this.prisma.$transaction((tx) =>
this.fraudRepository.markPaymentFailed(tx, envelope.aggregateId)
);

const reason =
error instanceof Error ? error.message : 'Unexpected fraud processing error';
await this.publishFailed(envelope.aggregateId, envelope.traceId, reason);
this.logger.error(`Event ${envelope.eventId} sent to DLT after retry exhaustion`);
}
}
}

private async publishFailed(paymentId: string, traceId: string, reason: string): Promise<void> {
const failedEvent: PaymentFailedV1 = {
eventId: randomUUID(),
eventType: TOPIC_NAMES.paymentFailedV1,
occurredAt: new Date().toISOString(),
aggregateId: paymentId,
traceId,
payload: {
reason,
failedBy: 'fraud-worker'
}
};

await this.dltPublisher.publishFailed(failedEvent);
}

private async publishSettled(paymentId: string, traceId: string): Promise<void> {
const settledEvent: PaymentSettledV1 = {
eventId: randomUUID(),
eventType: TOPIC_NAMES.paymentSettledV1,
occurredAt: new Date().toISOString(),
aggregateId: paymentId,
traceId,
payload: {
settledAt: new Date().toISOString()
}
};

await this.dltPublisher.publishSettled(settledEvent);
}
}
26 changes: 26 additions & 0 deletions apps/fraud-worker/src/fraud.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { Module } from '@nestjs/common';
import { PrismaClient } from '@prisma/client';
import { FraudConsumer } from './fraud.consumer';
import { FraudService } from './fraud.service';
import { FraudRepository } from './fraud.repository';
import { ProcessedEventsRepository } from './processed-events.repository';
import { DltPublisher } from './dlt.publisher';

@Module({
providers: [
{
provide: PrismaClient,
useFactory: async () => {
const client = new PrismaClient();
await client.$connect();
return client;
}
},
FraudConsumer,
FraudService,
FraudRepository,
ProcessedEventsRepository,
DltPublisher
]
})
export class FraudModule {}
Loading