Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
590 changes: 538 additions & 52 deletions README.md

Large diffs are not rendered by default.

53 changes: 39 additions & 14 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,25 +1,50 @@
version: "3.7"
version: '3.9'

services:
postgres:
image: postgres:14
image: postgres:16
container_name: payments-postgres
restart: always
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: payments_db
ports:
- "5432:5432"
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
volumes:
- postgres_data:/var/lib/postgresql/data
- ./sql/init.sql:/docker-entrypoint-initdb.d/init.sql

zookeeper:
image: confluentinc/cp-zookeeper:5.5.3
image: confluentinc/cp-zookeeper:7.5.0
container_name: payments-zookeeper
restart: always
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"

kafka:
image: confluentinc/cp-enterprise-kafka:5.5.3
depends_on: [zookeeper]
image: confluentinc/cp-kafka:7.5.0
container_name: payments-kafka
restart: always
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9991
ports:
- 9092:9092
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT

volumes:
postgres_data:
5 changes: 5 additions & 0 deletions nest-cli.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"$schema": "https://json.schemastore.org/nest-cli",
"collection": "@nestjs/schematics",
"sourceRoot": "src"
}
38 changes: 38 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"name": "payment-settlement-challenge",
"version": "1.0.0",
"description": "Challenge 1 - payment settlement pipeline with transactional outbox, idempotent consumers and status aggregation",
"private": true,
"license": "UNLICENSED",
"scripts": {
"build": "nest build",
"start": "nest start",
"start:dev": "nest start --watch",
"start:prod": "node dist/main",
"lint": "echo 'lint not configured'",
"test": "echo 'tests not configured'"
},
"dependencies": {
"@nestjs/common": "^10.4.2",
"@nestjs/config": "^3.2.3",
"@nestjs/core": "^10.4.2",
"@nestjs/platform-express": "^10.4.2",
"@nestjs/typeorm": "^10.0.2",
"class-transformer": "^0.5.1",
"class-validator": "^0.14.1",
"kafkajs": "^2.2.4",
"pg": "^8.12.0",
"reflect-metadata": "^0.2.2",
"rxjs": "^7.8.1",
"typeorm": "^0.3.20"
},
"devDependencies": {
"@nestjs/cli": "^10.4.5",
"@nestjs/schematics": "^10.1.4",
"@nestjs/testing": "^10.4.2",
"@types/node": "^20.14.15",
"ts-loader": "^9.5.1",
"ts-node": "^10.9.2",
"typescript": "^5.5.4"
}
}
66 changes: 66 additions & 0 deletions sql/init.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
CREATE TABLE IF NOT EXISTS payments (
id UUID PRIMARY KEY,
user_id VARCHAR(64) NOT NULL,
country VARCHAR(8) NOT NULL,
amount NUMERIC(18,2) NOT NULL,
currency VARCHAR(8) NOT NULL,
status VARCHAR(16) NOT NULL DEFAULT 'pending',
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);

CREATE TABLE IF NOT EXISTS outbox_events (
id UUID PRIMARY KEY,
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(50) NOT NULL,
topic VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
event_version INT NOT NULL,
payload JSONB NOT NULL,
headers JSONB NOT NULL DEFAULT '{}'::jsonb,
status VARCHAR(20) NOT NULL DEFAULT 'PENDING',
retry_count INT NOT NULL DEFAULT 0,
next_retry_at TIMESTAMP NULL,
published_at TIMESTAMP NULL,
last_error TEXT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);

CREATE INDEX IF NOT EXISTS idx_outbox_status_next_retry
ON outbox_events(status, next_retry_at, created_at);

CREATE TABLE IF NOT EXISTS payment_processing_status (
payment_id UUID PRIMARY KEY,
fraud_status VARCHAR(16) NOT NULL DEFAULT 'pending',
ledger_status VARCHAR(16) NOT NULL DEFAULT 'pending',
final_status VARCHAR(16) NOT NULL DEFAULT 'pending',
failure_reason TEXT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);

CREATE TABLE IF NOT EXISTS consumer_processed_events (
consumer_name VARCHAR(100) NOT NULL,
event_id UUID NOT NULL,
aggregate_id UUID NOT NULL,
processed_at TIMESTAMP NOT NULL DEFAULT NOW(),
PRIMARY KEY (consumer_name, event_id)
);

CREATE TABLE IF NOT EXISTS ledger_entries (
id UUID PRIMARY KEY,
payment_id UUID NOT NULL,
debit_account VARCHAR(100) NOT NULL,
credit_account VARCHAR(100) NOT NULL,
amount NUMERIC(18,2) NOT NULL,
currency VARCHAR(8) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);

CREATE TABLE IF NOT EXISTS notifications (
id UUID PRIMARY KEY,
payment_id UUID NOT NULL,
channel VARCHAR(20) NOT NULL,
status VARCHAR(20) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
12 changes: 12 additions & 0 deletions src/aggregator/aggregator.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { StatusAggregatorService } from './status-aggregator.service';
import { PaymentEntity } from '../payments/entities/payment.entity';
import { PaymentProcessingStatusEntity } from '../payments/entities/payment-processing-status.entity';
import { ConsumerProcessedEventEntity } from '../payments/entities/consumer-processed-event.entity';

@Module({
imports: [TypeOrmModule.forFeature([PaymentEntity, PaymentProcessingStatusEntity, ConsumerProcessedEventEntity])],
providers: [StatusAggregatorService],
})
export class AggregatorModule {}
126 changes: 126 additions & 0 deletions src/aggregator/status-aggregator.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { KafkaService } from '../common/kafka/kafka.service';
import { CONSUMER_NAMES } from '../common/constants/consumer-names';
import { TOPICS } from '../common/constants/topics';
import {
BaseEvent,
PaymentFailedPayload,
PaymentFraudReviewedPayload,
PaymentLedgerWrittenPayload,
PaymentSettledPayload,
} from '../common/contracts/events';
import { PaymentEntity } from '../payments/entities/payment.entity';
import { PaymentProcessingStatusEntity } from '../payments/entities/payment-processing-status.entity';
import { ConsumerProcessedEventEntity } from '../payments/entities/consumer-processed-event.entity';
import { randomUUID } from 'crypto';

@Injectable()
export class StatusAggregatorService implements OnModuleInit {
private readonly logger = new Logger(StatusAggregatorService.name);

constructor(
private readonly configService: ConfigService,
private readonly kafkaService: KafkaService,
@InjectRepository(PaymentEntity)
private readonly paymentRepository: Repository<PaymentEntity>,
@InjectRepository(PaymentProcessingStatusEntity)
private readonly statusRepository: Repository<PaymentProcessingStatusEntity>,
@InjectRepository(ConsumerProcessedEventEntity)
private readonly processedRepository: Repository<ConsumerProcessedEventEntity>,
) {}

async onModuleInit(): Promise<void> {
const groupId = this.configService.get<string>('KAFKA_GROUP_STATUS', 'payment-status-group');

await this.kafkaService.subscribe({
groupId: `${groupId}-fraud`,
topic: TOPICS.PAYMENT_FRAUD_REVIEWED,
eachMessage: async ({ value }) => this.handleFraudReviewed(value as BaseEvent<PaymentFraudReviewedPayload>),
});
await this.kafkaService.subscribe({
groupId: `${groupId}-ledger`,
topic: TOPICS.PAYMENT_LEDGER_WRITTEN,
eachMessage: async ({ value }) => this.handleLedgerWritten(value as BaseEvent<PaymentLedgerWrittenPayload>),
});
await this.kafkaService.subscribe({
groupId: `${groupId}-failed`,
topic: TOPICS.PAYMENT_FAILED,
eachMessage: async ({ value }) => this.handleFailed(value as BaseEvent<PaymentFailedPayload>),
});
}

private async wasProcessed(consumerName: string, eventId: string): Promise<boolean> {
return Boolean(await this.processedRepository.findOne({ where: { consumerName, eventId } }));
}

private async markProcessed(consumerName: string, eventId: string, aggregateId: string): Promise<void> {
await this.processedRepository.save(
this.processedRepository.create({ consumerName, eventId, aggregateId, processedAt: new Date() }),
);
}

private async handleFraudReviewed(event: BaseEvent<PaymentFraudReviewedPayload>): Promise<void> {
if (await this.wasProcessed(CONSUMER_NAMES.STATUS_FRAUD, event.eventId)) {
return;
}
await this.statusRepository.update({ paymentId: event.payload.paymentId }, { fraudStatus: 'succeeded' });
await this.tryFinalize(event.payload.paymentId, event);
await this.markProcessed(CONSUMER_NAMES.STATUS_FRAUD, event.eventId, event.aggregateId);
}

private async handleLedgerWritten(event: BaseEvent<PaymentLedgerWrittenPayload>): Promise<void> {
if (await this.wasProcessed(CONSUMER_NAMES.STATUS_LEDGER, event.eventId)) {
return;
}
await this.statusRepository.update({ paymentId: event.payload.paymentId }, { ledgerStatus: 'succeeded' });
await this.tryFinalize(event.payload.paymentId, event);
await this.markProcessed(CONSUMER_NAMES.STATUS_LEDGER, event.eventId, event.aggregateId);
}

private async handleFailed(event: BaseEvent<PaymentFailedPayload>): Promise<void> {
if (await this.wasProcessed(CONSUMER_NAMES.STATUS_FAILED, event.eventId)) {
return;
}

await this.statusRepository.update(
{ paymentId: event.payload.paymentId },
{ finalStatus: 'failed', failureReason: event.payload.reason },
);
await this.paymentRepository.update({ id: event.payload.paymentId }, { status: 'failed' });
await this.markProcessed(CONSUMER_NAMES.STATUS_FAILED, event.eventId, event.aggregateId);
this.logger.warn(`Payment ${event.payload.paymentId} marked as failed`);
}

private async tryFinalize(paymentId: string, triggeringEvent: BaseEvent<unknown>): Promise<void> {
const status = await this.statusRepository.findOne({ where: { paymentId } });
if (!status || status.finalStatus !== 'pending') {
return;
}

if (status.fraudStatus === 'succeeded' && status.ledgerStatus === 'succeeded') {
await this.statusRepository.update({ paymentId }, { finalStatus: 'settled' });
await this.paymentRepository.update({ id: paymentId }, { status: 'settled' });

const settledEvent: BaseEvent<PaymentSettledPayload> = {
eventId: randomUUID(),
eventType: 'payment.settled',
eventVersion: 1,
occurredAt: new Date().toISOString(),
correlationId: triggeringEvent.correlationId,
causationId: triggeringEvent.eventId,
aggregateId: paymentId,
aggregateType: 'payment',
country: triggeringEvent.country as string,
payload: {
paymentId,
settledAt: new Date().toISOString(),
},
};
await this.kafkaService.publish(TOPICS.PAYMENT_SETTLED, paymentId, settledEvent as unknown as Record<string, unknown>);
this.logger.log(`Payment ${paymentId} marked as settled`);
}
}
}
50 changes: 50 additions & 0 deletions src/app.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { TypeOrmModule } from '@nestjs/typeorm';
import { PaymentsModule } from './payments/payments.module';
import { RelayModule } from './relay/relay.module';
import { HealthModule } from './health/health.module';
import { ConsumersModule } from './consumers/consumers.module';
import { AggregatorModule } from './aggregator/aggregator.module';
import { NotificationsModule } from './notifications/notifications.module';
import { PaymentEntity } from './payments/entities/payment.entity';
import { OutboxEventEntity } from './payments/entities/outbox-event.entity';
import { PaymentProcessingStatusEntity } from './payments/entities/payment-processing-status.entity';
import { ConsumerProcessedEventEntity } from './payments/entities/consumer-processed-event.entity';
import { LedgerEntryEntity } from './payments/entities/ledger-entry.entity';
import { NotificationEntity } from './payments/entities/notification.entity';
import { KafkaModule } from './common/kafka/kafka.module';

@Module({
imports: [
ConfigModule.forRoot({ isGlobal: true }),
TypeOrmModule.forRootAsync({
inject: [ConfigService],
useFactory: (config: ConfigService) => ({
type: 'postgres' as const,
host: config.get<string>('DB_HOST'),
port: Number(config.get<string>('DB_PORT')),
username: config.get<string>('DB_USERNAME'),
password: config.get<string>('DB_PASSWORD'),
database: config.get<string>('DB_NAME'),
entities: [
PaymentEntity,
OutboxEventEntity,
PaymentProcessingStatusEntity,
ConsumerProcessedEventEntity,
LedgerEntryEntity,
NotificationEntity,
],
synchronize: false,
}),
}),
KafkaModule,
PaymentsModule,
RelayModule,
ConsumersModule,
AggregatorModule,
NotificationsModule,
HealthModule,
],
})
export class AppModule {}
9 changes: 9 additions & 0 deletions src/common/constants/consumer-names.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export const CONSUMER_NAMES = {
FRAUD: 'fraud-consumer',
LEDGER: 'ledger-consumer',
STATUS_FRAUD: 'status-aggregator-fraud-reviewed',
STATUS_LEDGER: 'status-aggregator-ledger-written',
STATUS_FAILED: 'status-aggregator-failed',
NOTIFY_SETTLED: 'notify-consumer-settled',
NOTIFY_FAILED: 'notify-consumer-failed',
};
7 changes: 7 additions & 0 deletions src/common/constants/topics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export const TOPICS = {
PAYMENT_CREATED: 'payment.created.v1',
PAYMENT_FRAUD_REVIEWED: 'payment.fraud-reviewed.v1',
PAYMENT_LEDGER_WRITTEN: 'payment.ledger-written.v1',
PAYMENT_SETTLED: 'payment.settled.v1',
PAYMENT_FAILED: 'payment.failed.v1',
};
Loading