From d628391fd35a444a6490111224aa293c589af621 Mon Sep 17 00:00:00 2001 From: Godsmiracle001 Date: Fri, 27 Mar 2026 18:49:33 +0100 Subject: [PATCH] feat: add observability and stellar reconciliation worker --- .env.example | 10 + Readme.md | 21 +- src/app.ts | 39 ++- src/config/database.ts | 1 + src/config/env.ts | 100 +++++- src/index.ts | 94 +++++- src/middleware/error.middleware.ts | 50 ++- .../request-observability.middleware.ts | 88 +++++ src/observability/logger.ts | 53 +++ src/observability/metrics.ts | 125 ++++++++ src/routes/auth.routes.ts | 5 + src/types/express.d.ts | 2 + .../reconcile-pending-stellar-state.worker.ts | 301 ++++++++++++++++++ tests/observability.test.ts | 159 +++++++++ ...ncile-pending-stellar-state.worker.test.ts | 242 ++++++++++++++ 15 files changed, 1255 insertions(+), 35 deletions(-) create mode 100644 src/middleware/request-observability.middleware.ts create mode 100644 src/observability/logger.ts create mode 100644 src/observability/metrics.ts create mode 100644 src/workers/reconcile-pending-stellar-state.worker.ts create mode 100644 tests/observability.test.ts create mode 100644 tests/reconcile-pending-stellar-state.worker.test.ts diff --git a/.env.example b/.env.example index 7a60786..3426e31 100644 --- a/.env.example +++ b/.env.example @@ -29,6 +29,16 @@ JWT_SECRET=your-super-secret-jwt-key JWT_EXPIRES_IN=15m AUTH_CHALLENGE_TTL_MS=300000 +# Observability +METRICS_ENABLED=true + +# Background reconciliation +STELLAR_RECONCILIATION_ENABLED=false +STELLAR_RECONCILIATION_INTERVAL_MS=30000 +STELLAR_RECONCILIATION_BATCH_SIZE=25 +STELLAR_RECONCILIATION_GRACE_PERIOD_MS=60000 +STELLAR_RECONCILIATION_MAX_RUNTIME_MS=10000 + # Email SENDGRID_API_KEY=SG.xxxxxxxxxxxxx FROM_EMAIL=noreply@stellarsettle.com diff --git a/Readme.md b/Readme.md index a8463ee..e260aa1 100644 --- a/Readme.md +++ b/Readme.md @@ -111,6 +111,16 @@ JWT_SECRET=your-super-secret-jwt-key JWT_EXPIRES_IN=15m AUTH_CHALLENGE_TTL_MS=300000 +# Observability +METRICS_ENABLED=true + +# Background reconciliation +STELLAR_RECONCILIATION_ENABLED=false +STELLAR_RECONCILIATION_INTERVAL_MS=30000 +STELLAR_RECONCILIATION_BATCH_SIZE=25 +STELLAR_RECONCILIATION_GRACE_PERIOD_MS=60000 +STELLAR_RECONCILIATION_MAX_RUNTIME_MS=10000 + # Email SENDGRID_API_KEY=SG.xxxxxxxxxxxxx FROM_EMAIL=noreply@stellarsettle.com @@ -199,9 +209,16 @@ npm run test:e2e ## 📊 Monitoring -- Health check: `GET /health` +- Health check: `GET /health` (includes process uptime and request ID) - Metrics: `GET /metrics` (Prometheus format) -- Logs: Winston with daily rotation +- Metrics labels are intentionally low-cardinality: `method`, normalized route template, and `status_class` +- Logs: Winston JSON logs with `X-Request-Id` correlation IDs + +## Background Reconciliation + +- Enable `STELLAR_RECONCILIATION_ENABLED=true` to start the in-process worker. +- The worker scans a bounded batch of stale pending investments / transactions and reuses the existing Stellar payment verification path for idempotent reconciliation. +- Current deployment assumption: run the worker on a single replica unless you add your own leader-election or advisory-lock strategy. ## 🚢 Deployment ```bash diff --git a/src/app.ts b/src/app.ts index b664b71..a6c8d58 100644 --- a/src/app.ts +++ b/src/app.ts @@ -1,29 +1,58 @@ import cors from "cors"; import express from "express"; import helmet from "helmet"; -import { errorMiddleware, notFoundMiddleware } from "./middleware/error.middleware"; +import { createErrorMiddleware, notFoundMiddleware } from "./middleware/error.middleware"; +import { createRequestObservabilityMiddleware } from "./middleware/request-observability.middleware"; +import { logger, type AppLogger } from "./observability/logger"; +import { getMetricsContentType, MetricsRegistry } from "./observability/metrics"; import { createAuthRouter } from "./routes/auth.routes"; import type { AuthService } from "./services/auth.service"; export interface AppDependencies { authService: AuthService; + logger?: AppLogger; + metricsEnabled?: boolean; + metricsRegistry?: MetricsRegistry; } -export function createApp({ authService }: AppDependencies) { +export function createApp({ + authService, + logger: appLogger = logger, + metricsEnabled = true, + metricsRegistry = new MetricsRegistry(), +}: AppDependencies) { const app = express(); app.use(helmet()); app.use(cors()); app.use(express.json()); + app.use( + createRequestObservabilityMiddleware({ + logger: appLogger, + metricsEnabled, + metricsRegistry, + }), + ); - app.get("/health", (_req, res) => { - res.status(200).json({ status: "ok" }); + app.get("/health", (req, res) => { + res.status(200).json({ + status: "ok", + uptimeSeconds: Number(process.uptime().toFixed(3)), + requestId: req.requestId, + }); }); + if (metricsEnabled) { + app.get("/metrics", (_req, res) => { + res.setHeader("Content-Type", getMetricsContentType()); + res.status(200).send(metricsRegistry.renderPrometheusMetrics()); + }); + } + app.use("/api/v1/auth", createAuthRouter(authService)); app.use(notFoundMiddleware); - app.use(errorMiddleware); + app.use(createErrorMiddleware(appLogger)); return app; } diff --git a/src/config/database.ts b/src/config/database.ts index fa69f2a..db076f8 100644 --- a/src/config/database.ts +++ b/src/config/database.ts @@ -1,3 +1,4 @@ +import "dotenv/config"; import "reflect-metadata"; import { DataSource } from "typeorm"; diff --git a/src/config/env.ts b/src/config/env.ts index 6c32a53..be690ac 100644 --- a/src/config/env.ts +++ b/src/config/env.ts @@ -1,3 +1,4 @@ +import "dotenv/config"; import { Networks } from "stellar-sdk"; type SupportedStellarNetwork = "testnet" | "mainnet" | "futurenet"; @@ -12,6 +13,16 @@ export interface AppConfig { auth: { challengeTtlMs: number; }; + observability: { + metricsEnabled: boolean; + }; + reconciliation: { + enabled: boolean; + intervalMs: number; + batchSize: number; + gracePeriodMs: number; + maxRuntimeMs: number; + }; stellar: { network: SupportedStellarNetwork; networkPassphrase: string; @@ -21,6 +32,12 @@ export interface AppConfig { const DEFAULT_PORT = 3000; const DEFAULT_JWT_EXPIRES_IN = "15m"; const DEFAULT_CHALLENGE_TTL_MS = 5 * 60 * 1000; +const DEFAULT_METRICS_ENABLED = true; +const DEFAULT_RECONCILIATION_ENABLED = false; +const DEFAULT_RECONCILIATION_INTERVAL_MS = 30 * 1000; +const DEFAULT_RECONCILIATION_BATCH_SIZE = 25; +const DEFAULT_RECONCILIATION_GRACE_PERIOD_MS = 60 * 1000; +const DEFAULT_RECONCILIATION_MAX_RUNTIME_MS = 10 * 1000; function parsePort(value: string | undefined): number { if (!value) { @@ -36,18 +53,55 @@ function parsePort(value: string | undefined): number { return port; } -function parseChallengeTtl(value: string | undefined): number { +function parsePositiveInteger( + value: string | undefined, + fallback: number, + name: string, +): number { if (!value) { - return DEFAULT_CHALLENGE_TTL_MS; + return fallback; } - const ttl = Number(value); + const parsedValue = Number(value); - if (!Number.isInteger(ttl) || ttl <= 0) { - throw new Error("AUTH_CHALLENGE_TTL_MS must be a positive integer."); + if (!Number.isInteger(parsedValue) || parsedValue <= 0) { + throw new Error(`${name} must be a positive integer.`); } - return ttl; + return parsedValue; +} + +function parseChallengeTtl(value: string | undefined): number { + return parsePositiveInteger( + value, + DEFAULT_CHALLENGE_TTL_MS, + "AUTH_CHALLENGE_TTL_MS", + ); +} + +function parseBoolean( + value: string | undefined, + fallback: boolean, + name: string, +): boolean { + if (!value) { + return fallback; + } + + switch (value.toLowerCase()) { + case "true": + case "1": + case "yes": + case "on": + return true; + case "false": + case "0": + case "no": + case "off": + return false; + default: + throw new Error(`${name} must be a boolean.`); + } } function resolveNetwork(network: string | undefined): AppConfig["stellar"] { @@ -94,6 +148,40 @@ export function getConfig(): AppConfig { auth: { challengeTtlMs: parseChallengeTtl(process.env.AUTH_CHALLENGE_TTL_MS), }, + observability: { + metricsEnabled: parseBoolean( + process.env.METRICS_ENABLED, + DEFAULT_METRICS_ENABLED, + "METRICS_ENABLED", + ), + }, + reconciliation: { + enabled: parseBoolean( + process.env.STELLAR_RECONCILIATION_ENABLED, + DEFAULT_RECONCILIATION_ENABLED, + "STELLAR_RECONCILIATION_ENABLED", + ), + intervalMs: parsePositiveInteger( + process.env.STELLAR_RECONCILIATION_INTERVAL_MS, + DEFAULT_RECONCILIATION_INTERVAL_MS, + "STELLAR_RECONCILIATION_INTERVAL_MS", + ), + batchSize: parsePositiveInteger( + process.env.STELLAR_RECONCILIATION_BATCH_SIZE, + DEFAULT_RECONCILIATION_BATCH_SIZE, + "STELLAR_RECONCILIATION_BATCH_SIZE", + ), + gracePeriodMs: parsePositiveInteger( + process.env.STELLAR_RECONCILIATION_GRACE_PERIOD_MS, + DEFAULT_RECONCILIATION_GRACE_PERIOD_MS, + "STELLAR_RECONCILIATION_GRACE_PERIOD_MS", + ), + maxRuntimeMs: parsePositiveInteger( + process.env.STELLAR_RECONCILIATION_MAX_RUNTIME_MS, + DEFAULT_RECONCILIATION_MAX_RUNTIME_MS, + "STELLAR_RECONCILIATION_MAX_RUNTIME_MS", + ), + }, stellar: resolveNetwork(process.env.STELLAR_NETWORK), }; } diff --git a/src/index.ts b/src/index.ts index 33ae07d..e021292 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,9 +1,32 @@ +import type { Server } from "http"; import { createApp } from "./app"; -import { getConfig } from "./config/env"; import dataSource from "./config/database"; +import { getConfig } from "./config/env"; +import { getPaymentVerificationConfig } from "./config/stellar"; +import { logger } from "./observability/logger"; import { createAuthService } from "./services/auth.service"; +import { createVerifyPaymentService } from "./services/stellar/verify-payment.service"; +import { createReconcilePendingStellarStateWorker } from "./workers/reconcile-pending-stellar-state.worker"; + +export interface ApplicationRuntime { + stop(signal?: string): Promise; + server: Server; +} + +function closeServer(server: Server): Promise { + return new Promise((resolve, reject) => { + server.close((error) => { + if (error) { + reject(error); + return; + } -export async function bootstrap(): Promise { + resolve(); + }); + }); +} + +export async function bootstrap(): Promise { const config = getConfig(); if (!dataSource.isInitialized) { @@ -11,13 +34,72 @@ export async function bootstrap(): Promise { } const authService = createAuthService(dataSource, config); - const app = createApp({ authService }); + const app = createApp({ + authService, + logger, + metricsEnabled: config.observability.metricsEnabled, + }); + const server = await new Promise((resolve) => { + const listeningServer = app.listen(config.port, () => { + logger.info("StellarSettle API listening.", { + port: config.port, + metricsEnabled: config.observability.metricsEnabled, + }); + resolve(listeningServer); + }); + }); + + const reconciliationWorker = config.reconciliation.enabled + ? createReconcilePendingStellarStateWorker( + dataSource, + createVerifyPaymentService(dataSource, getPaymentVerificationConfig()), + config.reconciliation, + logger, + ) + : null; - app.listen(config.port, () => { - process.stdout.write(`StellarSettle API listening on port ${config.port}\n`); + reconciliationWorker?.start(); + + let shutdownPromise: Promise | null = null; + + const stop = async (signal = "manual"): Promise => { + if (shutdownPromise) { + return shutdownPromise; + } + + shutdownPromise = (async () => { + logger.info("Shutting down StellarSettle API.", { signal }); + await reconciliationWorker?.stop(); + await closeServer(server); + + if (dataSource.isInitialized) { + await dataSource.destroy(); + } + + logger.info("StellarSettle API stopped.", { signal }); + })(); + + return shutdownPromise; + }; + + process.once("SIGTERM", () => { + void stop("SIGTERM"); + }); + process.once("SIGINT", () => { + void stop("SIGINT"); }); + + return { + stop, + server, + }; } if (require.main === module) { - void bootstrap(); + void bootstrap().catch((error: unknown) => { + logger.error("Failed to bootstrap StellarSettle API.", { + error: error instanceof Error ? error.message : "Unknown error", + }); + process.exitCode = 1; + }); } diff --git a/src/middleware/error.middleware.ts b/src/middleware/error.middleware.ts index d7dc4da..552bf0b 100644 --- a/src/middleware/error.middleware.ts +++ b/src/middleware/error.middleware.ts @@ -1,27 +1,45 @@ import type { NextFunction, Request, Response } from "express"; +import type { AppLogger } from "../observability/logger"; import { HttpError } from "../utils/http-error"; export function notFoundMiddleware(_req: Request, _res: Response, next: NextFunction) { next(new HttpError(404, "Route not found.")); } -export function errorMiddleware( - error: unknown, - _req: Request, - res: Response, - next: NextFunction, -): void { - void next; +export function createErrorMiddleware(logger: AppLogger) { + return ( + error: unknown, + req: Request, + res: Response, + next: NextFunction, + ): void => { + void next; - if (error instanceof HttpError) { - res.status(error.statusCode).json({ - error: error.message, - details: error.details, + if (error instanceof HttpError) { + logger.warn("HTTP request failed.", { + requestId: req.requestId, + method: req.method, + path: req.path, + statusCode: error.statusCode, + error: error.message, + }); + res.status(error.statusCode).json({ + error: error.message, + details: error.details, + }); + return; + } + + logger.error("Unhandled request error.", { + requestId: req.requestId, + method: req.method, + path: req.path, + statusCode: 500, + error: error instanceof Error ? error.message : "Unknown error", }); - return; - } - res.status(500).json({ - error: "Internal server error.", - }); + res.status(500).json({ + error: "Internal server error.", + }); + }; } diff --git a/src/middleware/request-observability.middleware.ts b/src/middleware/request-observability.middleware.ts new file mode 100644 index 0000000..cb8ab86 --- /dev/null +++ b/src/middleware/request-observability.middleware.ts @@ -0,0 +1,88 @@ +import { randomUUID } from "crypto"; +import type { NextFunction, Request, Response } from "express"; +import type { AppLogger } from "../observability/logger"; +import type { MetricsRegistry } from "../observability/metrics"; + +interface RequestObservabilityDependencies { + logger: AppLogger; + metricsEnabled: boolean; + metricsRegistry: MetricsRegistry; +} + +function resolveRoutePrefix(req: Request): string { + if (req.routeBasePath) { + return req.routeBasePath; + } + + if (req.baseUrl) { + return req.baseUrl; + } + + const originalPath = req.originalUrl.split("?")[0]; + + if (!req.path || !originalPath.endsWith(req.path)) { + return ""; + } + + return originalPath.slice(0, originalPath.length - req.path.length); +} + +function resolveRouteLabel(req: Request): string { + const routePath = req.route?.path; + + if (!routePath) { + return "unmatched"; + } + + const normalizedRoutePath = Array.isArray(routePath) ? routePath[0] : routePath; + const route = `${resolveRoutePrefix(req)}${normalizedRoutePath}`; + + return route || "/"; +} + +function resolveRequestId(requestIdHeader: string | string[] | undefined): string { + if (typeof requestIdHeader === "string" && requestIdHeader.trim()) { + return requestIdHeader.trim(); + } + + return randomUUID(); +} + +export function createRequestObservabilityMiddleware( + dependencies: RequestObservabilityDependencies, +) { + return (req: Request, res: Response, next: NextFunction): void => { + const requestId = resolveRequestId(req.header("x-request-id")); + const startedAt = process.hrtime.bigint(); + + req.requestId = requestId; + res.setHeader("X-Request-Id", requestId); + + res.on("finish", () => { + const durationMs = Number(process.hrtime.bigint() - startedAt) / 1_000_000; + const route = resolveRouteLabel(req); + const statusClass = `${Math.floor(res.statusCode / 100)}xx`; + const metadata = { + requestId, + method: req.method, + route, + statusCode: res.statusCode, + statusClass, + durationMs: Number(durationMs.toFixed(3)), + }; + + dependencies.logger.info("HTTP request completed.", metadata); + + if (dependencies.metricsEnabled) { + dependencies.metricsRegistry.recordHttpRequest({ + method: req.method, + route, + statusClass, + durationMs, + }); + } + }); + + next(); + }; +} diff --git a/src/observability/logger.ts b/src/observability/logger.ts new file mode 100644 index 0000000..b0b68c6 --- /dev/null +++ b/src/observability/logger.ts @@ -0,0 +1,53 @@ +import winston from "winston"; + +export type LogMetadata = Record; + +export interface AppLogger { + info(message: string, metadata?: LogMetadata): void; + warn(message: string, metadata?: LogMetadata): void; + error(message: string, metadata?: LogMetadata): void; + child(metadata: LogMetadata): AppLogger; +} + +class WinstonAppLogger implements AppLogger { + constructor(private readonly baseLogger: winston.Logger) {} + + info(message: string, metadata: LogMetadata = {}): void { + this.baseLogger.info(message, metadata); + } + + warn(message: string, metadata: LogMetadata = {}): void { + this.baseLogger.warn(message, metadata); + } + + error(message: string, metadata: LogMetadata = {}): void { + this.baseLogger.error(message, metadata); + } + + child(metadata: LogMetadata): AppLogger { + return new WinstonAppLogger(this.baseLogger.child(metadata)); + } +} + +function createBaseLogger(): winston.Logger { + return winston.createLogger({ + level: + process.env.LOG_LEVEL ?? + (process.env.NODE_ENV === "test" ? "silent" : "info"), + defaultMeta: { + service: "stellarsettle-api", + }, + format: winston.format.combine( + winston.format.timestamp(), + winston.format.errors({ stack: true }), + winston.format.json(), + ), + transports: [new winston.transports.Console()], + }); +} + +export function createLogger(baseLogger: winston.Logger = createBaseLogger()): AppLogger { + return new WinstonAppLogger(baseLogger); +} + +export const logger = createLogger(); diff --git a/src/observability/metrics.ts b/src/observability/metrics.ts new file mode 100644 index 0000000..8bdf209 --- /dev/null +++ b/src/observability/metrics.ts @@ -0,0 +1,125 @@ +const HTTP_DURATION_BUCKETS_MS = [5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000]; +const PROMETHEUS_CONTENT_TYPE = "text/plain; version=0.0.4; charset=utf-8"; + +interface RequestMetricLabels { + method: string; + route: string; + statusClass: string; +} + +interface HistogramMetric { + labels: RequestMetricLabels; + bucketCounts: number[]; + count: number; + sum: number; +} + +interface CounterMetric { + labels: RequestMetricLabels; + value: number; +} + +function escapeLabelValue(value: string): string { + return value.replace(/\\/g, "\\\\").replace(/\n/g, "\\n").replace(/"/g, '\\"'); +} + +function buildLabelSet(labels: RequestMetricLabels): string { + return `method="${escapeLabelValue(labels.method)}",route="${escapeLabelValue( + labels.route, + )}",status_class="${escapeLabelValue(labels.statusClass)}"`; +} + +function buildMetricKey(labels: RequestMetricLabels): string { + return `${labels.method}|${labels.route}|${labels.statusClass}`; +} + +export class MetricsRegistry { + private readonly requestCounters = new Map(); + private readonly requestDurationHistograms = new Map(); + + recordHttpRequest(input: RequestMetricLabels & { durationMs: number }): void { + const labels: RequestMetricLabels = { + method: input.method, + route: input.route, + statusClass: input.statusClass, + }; + const key = buildMetricKey(labels); + const requestCounter = this.requestCounters.get(key) ?? { + labels, + value: 0, + }; + + requestCounter.value += 1; + this.requestCounters.set(key, requestCounter); + + const histogram = this.requestDurationHistograms.get(key) ?? { + labels, + bucketCounts: HTTP_DURATION_BUCKETS_MS.map(() => 0), + count: 0, + sum: 0, + }; + + histogram.count += 1; + histogram.sum += input.durationMs; + + for (let index = 0; index < HTTP_DURATION_BUCKETS_MS.length; index += 1) { + if (input.durationMs <= HTTP_DURATION_BUCKETS_MS[index]) { + histogram.bucketCounts[index] += 1; + break; + } + } + + this.requestDurationHistograms.set(key, histogram); + } + + renderPrometheusMetrics(): string { + const lines = [ + "# HELP stellarsettle_http_requests_total Total completed HTTP requests.", + "# TYPE stellarsettle_http_requests_total counter", + ]; + + for (const metric of this.requestCounters.values()) { + lines.push( + `stellarsettle_http_requests_total{${buildLabelSet(metric.labels)}} ${metric.value}`, + ); + } + + lines.push( + "# HELP stellarsettle_http_request_duration_ms HTTP request duration in milliseconds.", + "# TYPE stellarsettle_http_request_duration_ms histogram", + ); + + for (const metric of this.requestDurationHistograms.values()) { + let cumulativeCount = 0; + + for (let index = 0; index < HTTP_DURATION_BUCKETS_MS.length; index += 1) { + cumulativeCount += metric.bucketCounts[index]; + lines.push( + `stellarsettle_http_request_duration_ms_bucket{${buildLabelSet( + metric.labels, + )},le="${HTTP_DURATION_BUCKETS_MS[index]}"} ${cumulativeCount}`, + ); + } + + lines.push( + `stellarsettle_http_request_duration_ms_bucket{${buildLabelSet( + metric.labels, + )},le="+Inf"} ${metric.count}`, + `stellarsettle_http_request_duration_ms_sum{${buildLabelSet(metric.labels)}} ${metric.sum}`, + `stellarsettle_http_request_duration_ms_count{${buildLabelSet(metric.labels)}} ${metric.count}`, + ); + } + + lines.push( + "# HELP stellarsettle_process_uptime_seconds Process uptime in seconds.", + "# TYPE stellarsettle_process_uptime_seconds gauge", + `stellarsettle_process_uptime_seconds ${process.uptime()}`, + ); + + return `${lines.join("\n")}\n`; + } +} + +export function getMetricsContentType(): string { + return PROMETHEUS_CONTENT_TYPE; +} diff --git a/src/routes/auth.routes.ts b/src/routes/auth.routes.ts index 888025c..b5436a6 100644 --- a/src/routes/auth.routes.ts +++ b/src/routes/auth.routes.ts @@ -20,6 +20,11 @@ export function createAuthRouter(authService: AuthService): Router { const controller = createAuthController(authService); const authMiddleware = createAuthMiddleware(authService); + router.use((req, _res, next) => { + req.routeBasePath = req.baseUrl; + next(); + }); + router.post("/challenge", validateBody(challengeSchema), controller.challenge); router.post("/verify", validateBody(verifySchema), controller.verify); router.get("/me", authMiddleware, controller.me); diff --git a/src/types/express.d.ts b/src/types/express.d.ts index 1925784..dc82704 100644 --- a/src/types/express.d.ts +++ b/src/types/express.d.ts @@ -4,6 +4,8 @@ declare global { namespace Express { interface Request { user?: AuthenticatedRequestUser; + requestId?: string; + routeBasePath?: string; } } } diff --git a/src/workers/reconcile-pending-stellar-state.worker.ts b/src/workers/reconcile-pending-stellar-state.worker.ts new file mode 100644 index 0000000..f675135 --- /dev/null +++ b/src/workers/reconcile-pending-stellar-state.worker.ts @@ -0,0 +1,301 @@ +import { DataSource, LessThanOrEqual, Not, IsNull, Repository } from "typeorm"; +import type { AppConfig } from "../config/env"; +import type { + PaymentVerificationInput, + PaymentVerificationResult, + VerifyPaymentService, +} from "../services/stellar/verify-payment.service"; +import { Investment } from "../models/Investment.model"; +import { Transaction } from "../models/Transaction.model"; +import { InvestmentStatus, TransactionStatus, TransactionType } from "../types/enums"; +import { ServiceError } from "../utils/service-error"; +import type { AppLogger } from "../observability/logger"; + +type YieldControl = () => Promise; +type IntervalHandle = ReturnType; + +export interface ReconciliationCandidate { + investmentId: string; + stellarTxHash: string; + operationIndex?: number; + source: "investment" | "transaction"; + queuedAt: Date; +} + +export interface ReconciliationCandidateRepository { + findPendingCandidates(olderThan: Date, limit: number): Promise; +} + +export interface PaymentVerifier { + verifyPayment(input: PaymentVerificationInput): Promise; +} + +export interface ReconciliationTickResult { + candidatesFetched: number; + processed: number; + verified: number; + alreadyVerified: number; + failed: number; + deferredDueToRuntime: number; + durationMs: number; +} + +interface ReconcilePendingStellarStateWorkerDependencies { + repository: ReconciliationCandidateRepository; + paymentVerifier: PaymentVerifier; + config: AppConfig["reconciliation"]; + logger: AppLogger; + now?: () => Date; + yieldControl?: YieldControl; + setIntervalFn?: typeof setInterval; + clearIntervalFn?: typeof clearInterval; +} + +const EMPTY_TICK_RESULT: ReconciliationTickResult = { + candidatesFetched: 0, + processed: 0, + verified: 0, + alreadyVerified: 0, + failed: 0, + deferredDueToRuntime: 0, + durationMs: 0, +}; + +export class ReconcilePendingStellarStateWorker { + private readonly repository: ReconciliationCandidateRepository; + private readonly paymentVerifier: PaymentVerifier; + private readonly config: AppConfig["reconciliation"]; + private readonly logger: AppLogger; + private readonly now: () => Date; + private readonly yieldControl: YieldControl; + private readonly setIntervalFn: typeof setInterval; + private readonly clearIntervalFn: typeof clearInterval; + private intervalHandle: IntervalHandle | null = null; + private inFlightTick: Promise | null = null; + + constructor(dependencies: ReconcilePendingStellarStateWorkerDependencies) { + this.repository = dependencies.repository; + this.paymentVerifier = dependencies.paymentVerifier; + this.config = dependencies.config; + this.logger = dependencies.logger.child({ + component: "stellar-reconciliation-worker", + }); + this.now = dependencies.now ?? (() => new Date()); + this.yieldControl = + dependencies.yieldControl ?? + (() => new Promise((resolve) => setImmediate(resolve))); + this.setIntervalFn = dependencies.setIntervalFn ?? setInterval; + this.clearIntervalFn = dependencies.clearIntervalFn ?? clearInterval; + } + + start(): void { + if (!this.config.enabled || this.intervalHandle) { + return; + } + + this.logger.info("Starting Stellar reconciliation worker.", { + intervalMs: this.config.intervalMs, + batchSize: this.config.batchSize, + gracePeriodMs: this.config.gracePeriodMs, + maxRuntimeMs: this.config.maxRuntimeMs, + singleReplicaAssumption: true, + }); + + void this.scheduleTick(); + this.intervalHandle = this.setIntervalFn(() => { + void this.scheduleTick(); + }, this.config.intervalMs); + } + + async stop(): Promise { + if (this.intervalHandle) { + this.clearIntervalFn(this.intervalHandle); + this.intervalHandle = null; + } + + if (this.inFlightTick) { + await this.inFlightTick; + } + + this.logger.info("Stopped Stellar reconciliation worker."); + } + + async runTick(): Promise { + const startedAt = this.now(); + const cutoff = new Date(startedAt.getTime() - this.config.gracePeriodMs); + const deadline = startedAt.getTime() + this.config.maxRuntimeMs; + + try { + const candidates = await this.repository.findPendingCandidates( + cutoff, + this.config.batchSize, + ); + const result: ReconciliationTickResult = { + ...EMPTY_TICK_RESULT, + candidatesFetched: candidates.length, + }; + + for (let index = 0; index < candidates.length; index += 1) { + if (this.now().getTime() >= deadline) { + result.deferredDueToRuntime = candidates.length - index; + break; + } + + const candidate = candidates[index]; + + try { + const verificationResult = await this.paymentVerifier.verifyPayment({ + investmentId: candidate.investmentId, + stellarTxHash: candidate.stellarTxHash, + operationIndex: candidate.operationIndex, + }); + + result.processed += 1; + + if (verificationResult.outcome === "verified") { + result.verified += 1; + } else { + result.alreadyVerified += 1; + } + } catch (error) { + result.processed += 1; + result.failed += 1; + this.logger.warn("Failed to reconcile pending Stellar state.", { + investmentId: candidate.investmentId, + stellarTxHash: candidate.stellarTxHash, + operationIndex: candidate.operationIndex, + source: candidate.source, + errorCode: error instanceof ServiceError ? error.code : undefined, + error: error instanceof Error ? error.message : "Unknown error", + }); + } + + await this.yieldControl(); + } + + result.durationMs = this.now().getTime() - startedAt.getTime(); + + this.logger.info("Completed Stellar reconciliation tick.", { + ...result, + }); + + return result; + } catch (error) { + const result = { + ...EMPTY_TICK_RESULT, + durationMs: this.now().getTime() - startedAt.getTime(), + }; + + this.logger.error("Stellar reconciliation tick crashed.", { + error: error instanceof Error ? error.message : "Unknown error", + durationMs: result.durationMs, + }); + + return result; + } + } + + private async scheduleTick(): Promise { + if (this.inFlightTick) { + this.logger.warn("Skipping Stellar reconciliation tick because one is already running."); + return; + } + + this.inFlightTick = this.runTick().finally(() => { + this.inFlightTick = null; + }); + + await this.inFlightTick; + } +} + +class TypeOrmReconciliationCandidateRepository + implements ReconciliationCandidateRepository +{ + constructor( + private readonly investmentRepository: Repository, + private readonly transactionRepository: Repository, + ) {} + + async findPendingCandidates(olderThan: Date, limit: number): Promise { + const investmentRows = await this.investmentRepository.find({ + where: { + status: InvestmentStatus.PENDING, + createdAt: LessThanOrEqual(olderThan), + transactionHash: Not(IsNull()), + }, + order: { + createdAt: "ASC", + }, + take: limit, + }); + const transactionRows = await this.transactionRepository.find({ + where: { + status: TransactionStatus.PENDING, + type: TransactionType.INVESTMENT, + timestamp: LessThanOrEqual(olderThan), + investmentId: Not(IsNull()), + stellarTxHash: Not(IsNull()), + }, + order: { + timestamp: "ASC", + }, + take: limit, + }); + + const candidatesByInvestmentId = new Map(); + + for (const investment of investmentRows) { + if (!investment.transactionHash) { + continue; + } + + candidatesByInvestmentId.set(investment.id, { + investmentId: investment.id, + stellarTxHash: investment.transactionHash, + operationIndex: investment.stellarOperationIndex ?? undefined, + source: "investment", + queuedAt: investment.createdAt, + }); + } + + for (const transaction of transactionRows) { + if (!transaction.investmentId || !transaction.stellarTxHash) { + continue; + } + + if (candidatesByInvestmentId.has(transaction.investmentId)) { + continue; + } + + candidatesByInvestmentId.set(transaction.investmentId, { + investmentId: transaction.investmentId, + stellarTxHash: transaction.stellarTxHash, + operationIndex: transaction.stellarOperationIndex ?? undefined, + source: "transaction", + queuedAt: transaction.timestamp, + }); + } + + return [...candidatesByInvestmentId.values()] + .sort((left, right) => left.queuedAt.getTime() - right.queuedAt.getTime()) + .slice(0, limit); + } +} + +export function createReconcilePendingStellarStateWorker( + dataSource: DataSource, + paymentVerifier: VerifyPaymentService, + config: AppConfig["reconciliation"], + logger: AppLogger, +): ReconcilePendingStellarStateWorker { + return new ReconcilePendingStellarStateWorker({ + repository: new TypeOrmReconciliationCandidateRepository( + dataSource.getRepository(Investment), + dataSource.getRepository(Transaction), + ), + paymentVerifier, + config, + logger, + }); +} diff --git a/tests/observability.test.ts b/tests/observability.test.ts new file mode 100644 index 0000000..e891213 --- /dev/null +++ b/tests/observability.test.ts @@ -0,0 +1,159 @@ +import request from "supertest"; +import { createApp } from "../src/app"; +import type { AppLogger, LogMetadata } from "../src/observability/logger"; +import { MetricsRegistry } from "../src/observability/metrics"; +import type { AuthService } from "../src/services/auth.service"; + +interface LogEntry { + level: "info" | "warn" | "error"; + message: string; + metadata: LogMetadata; +} + +class CaptureLogger implements AppLogger { + constructor( + readonly entries: LogEntry[] = [], + private readonly defaultMetadata: LogMetadata = {}, + ) {} + + info(message: string, metadata: LogMetadata = {}): void { + this.entries.push({ + level: "info", + message, + metadata: { + ...this.defaultMetadata, + ...metadata, + }, + }); + } + + warn(message: string, metadata: LogMetadata = {}): void { + this.entries.push({ + level: "warn", + message, + metadata: { + ...this.defaultMetadata, + ...metadata, + }, + }); + } + + error(message: string, metadata: LogMetadata = {}): void { + this.entries.push({ + level: "error", + message, + metadata: { + ...this.defaultMetadata, + ...metadata, + }, + }); + } + + child(metadata: LogMetadata): AppLogger { + return new CaptureLogger(this.entries, { + ...this.defaultMetadata, + ...metadata, + }); + } +} + +function createAuthServiceStub(): AuthService { + return { + createChallenge: async () => { + throw new Error("Not implemented."); + }, + verifyChallenge: async () => { + throw new Error("Not implemented."); + }, + getCurrentUser: async () => { + throw new Error("Not implemented."); + }, + } as unknown as AuthService; +} + +describe("Observability", () => { + it("assigns distinct request IDs and includes them in request lifecycle logs", async () => { + const logger = new CaptureLogger(); + const app = createApp({ + authService: createAuthServiceStub(), + logger, + metricsEnabled: true, + metricsRegistry: new MetricsRegistry(), + }); + + const [firstResponse, secondResponse] = await Promise.all([ + request(app).get("/health").expect(200), + request(app).get("/health").expect(200), + ]); + + expect(firstResponse.headers["x-request-id"]).toEqual(expect.any(String)); + expect(secondResponse.headers["x-request-id"]).toEqual(expect.any(String)); + expect(firstResponse.headers["x-request-id"]).not.toBe( + secondResponse.headers["x-request-id"], + ); + + const requestLogs = logger.entries.filter( + (entry) => entry.level === "info" && entry.message === "HTTP request completed.", + ); + + expect(requestLogs).toHaveLength(2); + expect(requestLogs.map((entry) => entry.metadata.requestId)).toEqual( + expect.arrayContaining([ + firstResponse.headers["x-request-id"], + secondResponse.headers["x-request-id"], + ]), + ); + }); + + it("reuses X-Request-Id when a client provides one", async () => { + const app = createApp({ + authService: createAuthServiceStub(), + logger: new CaptureLogger(), + metricsEnabled: true, + metricsRegistry: new MetricsRegistry(), + }); + + const response = await request(app) + .get("/health") + .set("X-Request-Id", "client-request-id") + .expect(200); + + expect(response.headers["x-request-id"]).toBe("client-request-id"); + expect(response.body.requestId).toBe("client-request-id"); + }); + + it("exposes Prometheus metrics for matched routes and unmatched requests", async () => { + const app = createApp({ + authService: createAuthServiceStub(), + logger: new CaptureLogger(), + metricsEnabled: true, + metricsRegistry: new MetricsRegistry(), + }); + + await request(app).get("/health").expect(200); + await request(app).get("/api/v1/auth/me").expect(401); + await request(app).get("/does-not-exist").expect(404); + + const metricsResponse = await request(app).get("/metrics").expect(200); + + expect(metricsResponse.headers["content-type"]).toContain("text/plain"); + expect(metricsResponse.text).toContain( + "# TYPE stellarsettle_http_requests_total counter", + ); + expect(metricsResponse.text).toContain( + 'stellarsettle_http_requests_total{method="GET",route="/health",status_class="2xx"} 1', + ); + expect(metricsResponse.text).toContain( + 'stellarsettle_http_requests_total{method="GET",route="/api/v1/auth/me",status_class="4xx"} 1', + ); + expect(metricsResponse.text).toContain( + 'stellarsettle_http_requests_total{method="GET",route="unmatched",status_class="4xx"} 1', + ); + expect(metricsResponse.text).toContain( + "# TYPE stellarsettle_http_request_duration_ms histogram", + ); + expect(metricsResponse.text).toContain( + "# TYPE stellarsettle_process_uptime_seconds gauge", + ); + }); +}); diff --git a/tests/reconcile-pending-stellar-state.worker.test.ts b/tests/reconcile-pending-stellar-state.worker.test.ts new file mode 100644 index 0000000..0199a1d --- /dev/null +++ b/tests/reconcile-pending-stellar-state.worker.test.ts @@ -0,0 +1,242 @@ +import { + ReconcilePendingStellarStateWorker, + type ReconciliationCandidate, +} from "../src/workers/reconcile-pending-stellar-state.worker"; +import type { AppLogger, LogMetadata } from "../src/observability/logger"; +import { InvestmentStatus } from "../src/types/enums"; +import { ServiceError } from "../src/utils/service-error"; +import type { PaymentVerificationResult } from "../src/services/stellar/verify-payment.service"; + +interface LogEntry { + level: "info" | "warn" | "error"; + message: string; + metadata: LogMetadata; +} + +class CaptureLogger implements AppLogger { + constructor( + readonly entries: LogEntry[] = [], + private readonly defaultMetadata: LogMetadata = {}, + ) {} + + info(message: string, metadata: LogMetadata = {}): void { + this.entries.push({ + level: "info", + message, + metadata: { + ...this.defaultMetadata, + ...metadata, + }, + }); + } + + warn(message: string, metadata: LogMetadata = {}): void { + this.entries.push({ + level: "warn", + message, + metadata: { + ...this.defaultMetadata, + ...metadata, + }, + }); + } + + error(message: string, metadata: LogMetadata = {}): void { + this.entries.push({ + level: "error", + message, + metadata: { + ...this.defaultMetadata, + ...metadata, + }, + }); + } + + child(metadata: LogMetadata): AppLogger { + return new CaptureLogger(this.entries, { + ...this.defaultMetadata, + ...metadata, + }); + } +} + +function createCandidate( + investmentId: string, + stellarTxHash: string, + overrides: Partial = {}, +): ReconciliationCandidate { + return { + investmentId, + stellarTxHash, + source: overrides.source ?? "investment", + operationIndex: overrides.operationIndex, + queuedAt: overrides.queuedAt ?? new Date("2026-01-01T00:00:00.000Z"), + }; +} + +function createVerifiedResult( + investmentId: string, + outcome: "verified" | "already_verified", +): PaymentVerificationResult { + return { + outcome, + investmentId, + stellarTxHash: `tx-${investmentId}`, + operationIndex: 0, + transactionId: `transaction-${investmentId}`, + status: InvestmentStatus.CONFIRMED, + }; +} + +describe("ReconcilePendingStellarStateWorker", () => { + afterEach(() => { + jest.useRealTimers(); + }); + + it("reconciles actionable candidates, continues after errors, and yields between items", async () => { + const now = new Date("2026-01-01T00:10:00.000Z"); + const repository = { + findPendingCandidates: jest.fn().mockResolvedValue([ + createCandidate("investment-1", "hash-1"), + createCandidate("investment-2", "hash-2"), + createCandidate("investment-3", "hash-3"), + ]), + }; + const paymentVerifier = { + verifyPayment: jest + .fn() + .mockResolvedValueOnce(createVerifiedResult("investment-1", "verified")) + .mockRejectedValueOnce( + new ServiceError("transaction_not_found", "Transaction not found.", 404), + ) + .mockResolvedValueOnce( + createVerifiedResult("investment-3", "already_verified"), + ), + }; + const yieldControl = jest.fn(async () => undefined); + const logger = new CaptureLogger(); + const worker = new ReconcilePendingStellarStateWorker({ + repository, + paymentVerifier, + config: { + enabled: true, + intervalMs: 1_000, + batchSize: 3, + gracePeriodMs: 60_000, + maxRuntimeMs: 10_000, + }, + logger, + now: () => now, + yieldControl, + }); + + const result = await worker.runTick(); + + expect(repository.findPendingCandidates).toHaveBeenCalledWith( + new Date("2026-01-01T00:09:00.000Z"), + 3, + ); + expect(paymentVerifier.verifyPayment).toHaveBeenCalledTimes(3); + expect(yieldControl).toHaveBeenCalledTimes(3); + expect(result).toMatchObject({ + candidatesFetched: 3, + processed: 3, + verified: 1, + alreadyVerified: 1, + failed: 1, + deferredDueToRuntime: 0, + }); + expect(logger.entries).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + level: "warn", + message: "Failed to reconcile pending Stellar state.", + }), + expect.objectContaining({ + level: "info", + message: "Completed Stellar reconciliation tick.", + }), + ]), + ); + }); + + it("stops starting new reconciliations once the tick runtime budget is exhausted", async () => { + let currentTimeMs = Date.parse("2026-01-01T00:00:00.000Z"); + const repository = { + findPendingCandidates: jest.fn().mockResolvedValue([ + createCandidate("investment-1", "hash-1"), + createCandidate("investment-2", "hash-2"), + createCandidate("investment-3", "hash-3"), + ]), + }; + const paymentVerifier = { + verifyPayment: jest.fn(async (input: { investmentId: string }) => { + currentTimeMs += 60; + return createVerifiedResult(input.investmentId, "verified"); + }), + }; + const worker = new ReconcilePendingStellarStateWorker({ + repository, + paymentVerifier, + config: { + enabled: true, + intervalMs: 1_000, + batchSize: 3, + gracePeriodMs: 60_000, + maxRuntimeMs: 100, + }, + logger: new CaptureLogger(), + now: () => new Date(currentTimeMs), + yieldControl: async () => undefined, + }); + + const result = await worker.runTick(); + + expect(paymentVerifier.verifyPayment).toHaveBeenCalledTimes(2); + expect(result).toMatchObject({ + candidatesFetched: 3, + processed: 2, + verified: 2, + deferredDueToRuntime: 1, + }); + }); + + it("schedules periodic ticks and stops scheduling after stop is called", async () => { + jest.useFakeTimers(); + + const repository = { + findPendingCandidates: jest.fn().mockResolvedValue([]), + }; + const paymentVerifier = { + verifyPayment: jest.fn(), + }; + const worker = new ReconcilePendingStellarStateWorker({ + repository, + paymentVerifier, + config: { + enabled: true, + intervalMs: 1_000, + batchSize: 5, + gracePeriodMs: 60_000, + maxRuntimeMs: 5_000, + }, + logger: new CaptureLogger(), + yieldControl: async () => undefined, + }); + + worker.start(); + await Promise.resolve(); + + expect(repository.findPendingCandidates).toHaveBeenCalledTimes(1); + + await jest.advanceTimersByTimeAsync(2_000); + + expect(repository.findPendingCandidates).toHaveBeenCalledTimes(3); + + await worker.stop(); + + await jest.advanceTimersByTimeAsync(5_000); + + expect(repository.findPendingCandidates).toHaveBeenCalledTimes(3); + }); +});