From 2c561cad0afd584ccc6b04da25a853006ef13fb9 Mon Sep 17 00:00:00 2001 From: Pavel Zotikov Date: Sun, 16 Mar 2025 04:46:21 +0300 Subject: [PATCH 1/4] Chore: Add new worker `performance` --- .gitignore | 1 + package.json | 3 +- workers/performance/README.md | 63 +++++++++++++++++++ workers/performance/package.json | 15 +++++ workers/performance/src/index.ts | 100 +++++++++++++++++++++++++++++++ workers/performance/src/types.ts | 64 ++++++++++++++++++++ 6 files changed, 245 insertions(+), 1 deletion(-) create mode 100644 workers/performance/README.md create mode 100644 workers/performance/package.json create mode 100644 workers/performance/src/index.ts create mode 100644 workers/performance/src/types.ts diff --git a/.gitignore b/.gitignore index cb3f9358..89e21924 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ coverage .DS_Store globalConfig.json *.log +.history diff --git a/package.json b/package.json index a02a10d5..cd9db6e9 100644 --- a/package.json +++ b/package.json @@ -43,7 +43,8 @@ "run-release": "yarn worker hawk-worker-release", "run-email": "yarn worker hawk-worker-email", "run-telegram": "yarn worker hawk-worker-telegram", - "run-limiter": "yarn worker hawk-worker-limiter" + "run-limiter": "yarn worker hawk-worker-limiter", + "run-performance": "yarn worker hawk-worker-performance" }, "dependencies": { "@hawk.so/nodejs": "^3.1.1", diff --git a/workers/performance/README.md b/workers/performance/README.md new file mode 100644 index 00000000..26eab945 --- /dev/null +++ b/workers/performance/README.md @@ -0,0 +1,63 @@ +# Performance worker + +This worker is needed to save performance data uploaded from user to our DB. + +## Performance delivery scheme + +1. User wants to deploy project +2. He runs deploy script on the server and it runs static builder, for example Webpack. +3. After Webpack finished his job, our [Webpack Plugin](https://github.com/codex-team/hawk.webpack.plugin) gets a source maps for new bundles and sends them to us. + +example request: + +```bash +curl --location 'http://localhost:3000/performance' \ +--header 'Content-Type: application/json' \ +--data '{ + "token": "eyJpbnRlZ3JhdGlvbklkIjoiZTU2ZTU5ODctN2JhZi00NTI3LWI4MmMtYjdkOWRhZDBiMDBmIiwic2VjcmV0IjoiZDQ5YTU0YjMtOWExZi00ZGI2LTkxZmYtMjk4M2JlMTVlODA0In0=", + "projectId": "67d4adeccf25fa00ab563c32", + "catcherType": "performance", + "payload": { + "projectId": "67d4adeccf25fa00ab563c32", + "transactionId": "drxEFnbxGc7OumTVl3FkCm1v9BvBC9OpBrEiE3qG", + "name": "complex-operation", + "timestamp": 1742075217, + "duration": 702.9999999999964, + "startTime": 15322.000000000002, + "endTime": 16024.999999999998, + "catcherVersion": "3.2.1", + "spans": [ + { + "id": "6tk2UD4m0wDUjD99uvO1wylp3SnYumiWPlhRCZ2w", + "name": "step-1", + "duration": 400.9999999999982, + "startTime": 15322.000000000002, + "endTime": 15723, + "transactionId": "drxEFnbxGc7OumTVl3FkCm1v9BvBC9OpBrEiE3qG" + }, + { + "id": "V2ZOiUWjtip5ZSATIr7VX4KInAZgGJxdUQNZot2j", + "name": "step-2", + "duration": 301.9999999999982, + "startTime": 15723, + "endTime": 16024.999999999998, + "transactionId": "drxEFnbxGc7OumTVl3FkCm1v9BvBC9OpBrEiE3qG" + }, + { + "id": "JGIerBJqInvnvpIPBwsUIfuIxt3LcWQ6lFPwQJdN", + "name": "step-3", + "duration": 300.9999999999982, + "startTime": 15724, + "endTime": 16024.999999999998, + "transactionId": "drxEFnbxGc7OumTVl3FkCm1v9BvBC9OpBrEiE3qG" + } + ], + "tags": { + "type": "background" + } + } +}' +``` + +4. Collector accepts file and give a task for PerformanceWorker for saving it to DB +5. PerformanceWorker saves it to DB. diff --git a/workers/performance/package.json b/workers/performance/package.json new file mode 100644 index 00000000..d7484538 --- /dev/null +++ b/workers/performance/package.json @@ -0,0 +1,15 @@ +{ + "name": "hawk-worker-performance", + "description": "Collects and parses performance", + "workerType": "performance", + "version": "0.0.1", + "main": "src/index.ts", + "repository": "https://github.com/codex-team/hawk.workers/tree/master/workers/performance", + "author": "CodeX", + "license": "UNLICENSED", + "private": true, + "devDependencies": { + "rimraf": "^3.0.0", + "webpack": "^4.39.3" + } +} \ No newline at end of file diff --git a/workers/performance/src/index.ts b/workers/performance/src/index.ts new file mode 100644 index 00000000..8059b318 --- /dev/null +++ b/workers/performance/src/index.ts @@ -0,0 +1,100 @@ +import { DatabaseController } from '../../../lib/db/controller'; +import { Worker } from '../../../lib/worker'; +import { DatabaseReadWriteError } from '../../../lib/workerErrors'; +import * as pkg from '../package.json'; +import { Collection } from 'mongodb'; +import type { PerformanceRecord, PerformanceDocument, PerformanceSpansDocument } from './types'; + +/** + * Performance worker + */ +export default class PerformanceWorker extends Worker { + /** + * Worker type (will pull tasks from Registry queue with the same name) + */ + public readonly type: string = pkg.workerType; + + /** + * Database Controller + */ + private db: DatabaseController = new DatabaseController(process.env.MONGO_EVENTS_DATABASE_URI); + + private readonly dbCollectionName: string = 'performance'; + private readonly dbSpansCollectionName: string = 'performance_spans'; + /** + * Collection to save performance data + */ + private performanceCollection: Collection; + + /** + * Collection to save performance spans + */ + private performanceSpansCollection: Collection; + + /** + * Start consuming messages + */ + public async start(): Promise { + await this.db.connect(); + this.db.createGridFsBucket(this.dbCollectionName); + this.performanceCollection = this.db.getConnection().collection(this.dbCollectionName); + this.performanceSpansCollection = this.db.getConnection().collection(this.dbSpansCollectionName); + await super.start(); + } + + /** + * Finish everything + */ + public async finish(): Promise { + await super.finish(); + await this.db.close(); + } + + /** + * Message handle function + * + * @param task - Message object from consume method + */ + public async handle(task: PerformanceRecord): Promise { + switch (task.catcherType) { + case 'performance': await this.savePerformance(task); break; + } + } + + /** + * Save performance data to database + * + * @param data - Performance record containing project ID and performance metrics + */ + private async savePerformance(data: PerformanceRecord): Promise { + try { + const { projectId, payload, catcherType } = data; + + if (catcherType !== 'performance') { + throw new Error('Invalid catcher type'); + } + + await Promise.all([ + this.performanceCollection.insertOne({ + projectId, + transactionId: payload.id, + timestamp: payload.timestamp, + duration: payload.duration, + name: payload.name, + catcherVersion: payload.catcherVersion, + tags: payload.tags, + }), + this.performanceSpansCollection.insertMany( + payload.spans.map(span => ({ + projectId, + timestamp: payload.timestamp, + ...span, + })) + ), + ]); + } catch (err) { + this.logger.error(`Couldn't save the release due to: ${err}`); + throw new DatabaseReadWriteError(err); + } + } +} diff --git a/workers/performance/src/types.ts b/workers/performance/src/types.ts new file mode 100644 index 00000000..8a4a4a80 --- /dev/null +++ b/workers/performance/src/types.ts @@ -0,0 +1,64 @@ +/** + * Interface for time span + */ +interface Span { + id: string; + name: string; + duration: number; + startTime: number; + endTime: number; + transactionId: string; +} + +/** + * Interface for performance data + */ +interface PerformancePayload { + id: string; + name: string; + timestamp: number; + duration: number; + startTime: number; + endTime: number; + catcherVersion: string; + spans: Span[]; + tags: { + [key: string]: string; + }; +} + +/** + * Main interface for performance record + */ +interface PerformanceRecord { + projectId: string; + payload: PerformancePayload; + catcherType: 'performance'; +} + +/** + * Interface for performance database document + */ +interface PerformanceDocument { + projectId: string; + transactionId: string; + timestamp: number; + duration: number; + name: string; + catcherVersion: string; + tags: Record; +} + +interface PerformanceSpansDocument extends Span { + projectId: string; + transactionId: string; + timestamp: number; +} + +export type { + Span, + PerformanceRecord, + PerformancePayload, + PerformanceDocument, + PerformanceSpansDocument +}; \ No newline at end of file From 32ff75609346386df10145bbc124c36f5a4c6691 Mon Sep 17 00:00:00 2001 From: Pavel Zotikov Date: Sun, 16 Mar 2025 04:50:30 +0300 Subject: [PATCH 2/4] change base collection for save transactions --- workers/performance/src/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workers/performance/src/index.ts b/workers/performance/src/index.ts index 8059b318..0105da3b 100644 --- a/workers/performance/src/index.ts +++ b/workers/performance/src/index.ts @@ -19,7 +19,7 @@ export default class PerformanceWorker extends Worker { */ private db: DatabaseController = new DatabaseController(process.env.MONGO_EVENTS_DATABASE_URI); - private readonly dbCollectionName: string = 'performance'; + private readonly dbCollectionName: string = 'performance_transactions'; private readonly dbSpansCollectionName: string = 'performance_spans'; /** * Collection to save performance data From b940a2dffa020ef5b38795e43e67959291411e0f Mon Sep 17 00:00:00 2001 From: Pavel Zotikov Date: Wed, 2 Apr 2025 20:15:52 +0300 Subject: [PATCH 3/4] refactor: update data format --- workers/performance/src/index.ts | 47 +++++++++++++++++------------ workers/performance/src/types.ts | 52 +++++++++++++++++++------------- 2 files changed, 59 insertions(+), 40 deletions(-) diff --git a/workers/performance/src/index.ts b/workers/performance/src/index.ts index 0105da3b..900851f9 100644 --- a/workers/performance/src/index.ts +++ b/workers/performance/src/index.ts @@ -14,6 +14,11 @@ export default class PerformanceWorker extends Worker { */ public readonly type: string = pkg.workerType; + /** + * Catcher version + */ + private readonly catcherVersion: string = '1.0'; + /** * Database Controller */ @@ -74,26 +79,30 @@ export default class PerformanceWorker extends Worker { throw new Error('Invalid catcher type'); } - await Promise.all([ - this.performanceCollection.insertOne({ - projectId, - transactionId: payload.id, - timestamp: payload.timestamp, - duration: payload.duration, - name: payload.name, - catcherVersion: payload.catcherVersion, - tags: payload.tags, - }), - this.performanceSpansCollection.insertMany( - payload.spans.map(span => ({ - projectId, - timestamp: payload.timestamp, - ...span, - })) - ), - ]); + await Promise.all( + payload.transactions.map(async (transaction) => { + await Promise.all([ + this.performanceCollection.insertOne({ + projectId, + transactionId: transaction.aggregationId, + timestamp: payload.timestamp, + duration: transaction.p95duration, + name: transaction.name, + catcherVersion: this.catcherVersion, + }), + this.performanceSpansCollection.insertMany( + transaction.aggregatedSpans.map(span => ({ + projectId, + transactionId: transaction.aggregationId, + timestamp: payload.timestamp, + ...span, + })) + ), + ]); + }) + ); } catch (err) { - this.logger.error(`Couldn't save the release due to: ${err}`); + this.logger.error(`Couldn't save performance data due to: ${err}`); throw new DatabaseReadWriteError(err); } } diff --git a/workers/performance/src/types.ts b/workers/performance/src/types.ts index 8a4a4a80..5472c063 100644 --- a/workers/performance/src/types.ts +++ b/workers/performance/src/types.ts @@ -1,30 +1,40 @@ /** - * Interface for time span + * Interface for aggregated span data */ -interface Span { - id: string; +interface AggregatedSpan { + aggregationId: string; name: string; - duration: number; - startTime: number; - endTime: number; - transactionId: string; + minStartTime: number; + maxEndTime: number; + p50duration: number; + p95duration: number; + maxDuration: number; + failureRate: number; +} + +/** + * Interface for transaction data + */ +interface Transaction { + aggregationId: string; + name: string; + avgStartTime: number; + minStartTime: number; + maxEndTime: number; + p50duration: number; + p95duration: number; + maxDuration: number; + count: number; + failureRate: number; + aggregatedSpans: AggregatedSpan[]; } /** * Interface for performance data */ interface PerformancePayload { - id: string; - name: string; - timestamp: number; - duration: number; - startTime: number; - endTime: number; - catcherVersion: string; - spans: Span[]; - tags: { - [key: string]: string; - }; + transactions: Transaction[]; + timestamp: number; } /** @@ -46,17 +56,17 @@ interface PerformanceDocument { duration: number; name: string; catcherVersion: string; - tags: Record; } -interface PerformanceSpansDocument extends Span { +interface PerformanceSpansDocument extends AggregatedSpan { projectId: string; transactionId: string; timestamp: number; } export type { - Span, + AggregatedSpan, + Transaction, PerformanceRecord, PerformancePayload, PerformanceDocument, From 957945beb5ac6a227cd10979f042796a23ed6b8b Mon Sep 17 00:00:00 2001 From: Pavel Zotikov Date: Thu, 3 Apr 2025 00:41:21 +0300 Subject: [PATCH 4/4] feat(performance): add data aggregation and rounding - Add transaction aggregation with p50/p95 percentiles - Round all numeric values to 3 decimal places - Add detailed JSDoc comments --- workers/performance/src/index.ts | 181 +++++++++++++++++++++++++------ workers/performance/src/types.ts | 26 ++++- 2 files changed, 168 insertions(+), 39 deletions(-) diff --git a/workers/performance/src/index.ts b/workers/performance/src/index.ts index 900851f9..4640fc12 100644 --- a/workers/performance/src/index.ts +++ b/workers/performance/src/index.ts @@ -3,7 +3,7 @@ import { Worker } from '../../../lib/worker'; import { DatabaseReadWriteError } from '../../../lib/workerErrors'; import * as pkg from '../package.json'; import { Collection } from 'mongodb'; -import type { PerformanceRecord, PerformanceDocument, PerformanceSpansDocument } from './types'; +import type { PerformanceRecord, PerformanceDocument, AggregatedTransaction } from './types'; /** * Performance worker @@ -14,28 +14,18 @@ export default class PerformanceWorker extends Worker { */ public readonly type: string = pkg.workerType; - /** - * Catcher version - */ - private readonly catcherVersion: string = '1.0'; - /** * Database Controller */ private db: DatabaseController = new DatabaseController(process.env.MONGO_EVENTS_DATABASE_URI); private readonly dbCollectionName: string = 'performance_transactions'; - private readonly dbSpansCollectionName: string = 'performance_spans'; + /** * Collection to save performance data */ private performanceCollection: Collection; - /** - * Collection to save performance spans - */ - private performanceSpansCollection: Collection; - /** * Start consuming messages */ @@ -43,7 +33,6 @@ export default class PerformanceWorker extends Worker { await this.db.connect(); this.db.createGridFsBucket(this.dbCollectionName); this.performanceCollection = this.db.getConnection().collection(this.dbCollectionName); - this.performanceSpansCollection = this.db.getConnection().collection(this.dbSpansCollectionName); await super.start(); } @@ -70,6 +59,13 @@ export default class PerformanceWorker extends Worker { * Save performance data to database * * @param data - Performance record containing project ID and performance metrics + * + * Key operations: + * 1. Round all numeric values to 3 decimal places to avoid floating point precision issues + * 2. Add timestamp to each transaction + * 3. Round values in aggregatedSpans as well + * 4. Use bulkWrite for efficient database operations + * 5. Trigger aggregation after saving */ private async savePerformance(data: PerformanceRecord): Promise { try { @@ -79,31 +75,150 @@ export default class PerformanceWorker extends Worker { throw new Error('Invalid catcher type'); } - await Promise.all( - payload.transactions.map(async (transaction) => { - await Promise.all([ - this.performanceCollection.insertOne({ + const ROUND_DECIMALS = 3; + const BASE = 10; + const transactionsWithTimestamp = payload.transactions.map(transaction => ({ + ...transaction, + timestamp: payload.timestamp, + minStartTime: Math.round(transaction.minStartTime * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS), + maxEndTime: Math.round(transaction.maxEndTime * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS), + maxDuration: Math.round(transaction.maxDuration * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS), + p50duration: Math.round(transaction.p50duration * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS), + p95duration: Math.round(transaction.p95duration * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS), + avgStartTime: Math.round(transaction.avgStartTime * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS), + aggregatedSpans: transaction.aggregatedSpans.map(span => ({ + ...span, + minStartTime: Math.round(span.minStartTime * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS), + maxEndTime: Math.round(span.maxEndTime * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS), + maxDuration: Math.round(span.maxDuration * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS), + p50duration: Math.round(span.p50duration * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS), + p95duration: Math.round(span.p95duration * Math.pow(BASE, ROUND_DECIMALS)) / Math.pow(BASE, ROUND_DECIMALS), + })), + })); + + const bulkOperations = transactionsWithTimestamp.map(transaction => ({ + updateOne: { + filter: { + projectId, + name: transaction.name, + }, + update: { + $push: { + transactions: transaction, + }, + $setOnInsert: { projectId, - transactionId: transaction.aggregationId, - timestamp: payload.timestamp, - duration: transaction.p95duration, name: transaction.name, - catcherVersion: this.catcherVersion, - }), - this.performanceSpansCollection.insertMany( - transaction.aggregatedSpans.map(span => ({ - projectId, - transactionId: transaction.aggregationId, - timestamp: payload.timestamp, - ...span, - })) - ), - ]); - }) - ); + createdAt: new Date(), + }, + }, + upsert: true, + } + })); + + await this.performanceCollection.bulkWrite(bulkOperations); + await this.aggregateTransactions(projectId); } catch (err) { this.logger.error(`Couldn't save performance data due to: ${err}`); throw new DatabaseReadWriteError(err); } } + + /** + * Aggregate transactions data for a project + * + * @param projectId - Project ID to aggregate data for + * + * Key operations: + * 1. Calculate min/max timestamps across all transactions + * 2. Sort durations array to calculate percentiles + * 3. Calculate p50 and p95 percentiles from sorted durations + * 4. Calculate failure rate based on error count + * 5. Round all numeric values to 3 decimal places + * 6. Update documents with aggregated data + */ + private async aggregateTransactions(projectId: string): Promise { + const PERCENTILE_50 = 0.5; + const PERCENTILE_95 = 0.95; + const ROUND_DECIMALS = 3; + + const aggregationPipeline = [ + { $match: { projectId } }, + { $unwind: '$transactions' }, + { + $group: { + _id: '$name', + minStartTime: { $min: '$transactions.minStartTime' }, + maxEndTime: { $max: '$transactions.maxEndTime' }, + durations: { $push: '$transactions.maxDuration' }, + maxDurations: { $push: '$transactions.maxDuration' }, + totalCount: { $sum: 1 }, + errorCount: { + $sum: { + $cond: [ { $eq: ['$transactions.status', 'error'] }, 1, 0], + }, + }, + }, + }, + { + $project: { + _id: 0, + name: '$_id', + minStartTime: { + $round: [ + { $min: '$minStartTime' }, + ROUND_DECIMALS, + ] + }, + maxEndTime: { + $round: [ + { $max: '$maxEndTime' }, + ROUND_DECIMALS, + ] + }, + p50duration: { + $round: [ + { + $arrayElemAt: [ + { $sortArray: { input: '$durations', sortBy: 1 } }, + { $floor: { $multiply: [ { $size: '$durations' }, PERCENTILE_50] } }, + ] + }, + ROUND_DECIMALS, + ] + }, + p95duration: { + $round: [ + { + $arrayElemAt: [ + { $sortArray: { input: '$durations', sortBy: 1 } }, + { $floor: { $multiply: [ { $size: '$durations' }, PERCENTILE_95] } }, + ] + }, + ROUND_DECIMALS, + ] + }, + maxDuration: { + $round: [ + { $max: '$maxDurations' }, + ROUND_DECIMALS, + ] + }, + failureRate: { + $round: [ + { $divide: ['$errorCount', '$totalCount'] }, + ROUND_DECIMALS, + ] + }, + }, + }, + ]; + + const aggregatedData = await this.performanceCollection.aggregate(aggregationPipeline).toArray(); + + await this.performanceCollection.updateMany( + { projectId }, + { $set: { aggregatedData } } + ); + } } diff --git a/workers/performance/src/types.ts b/workers/performance/src/types.ts index 5472c063..e2395b53 100644 --- a/workers/performance/src/types.ts +++ b/workers/performance/src/types.ts @@ -27,6 +27,7 @@ interface Transaction { count: number; failureRate: number; aggregatedSpans: AggregatedSpan[]; + timestamp: number; } /** @@ -51,24 +52,37 @@ interface PerformanceRecord { */ interface PerformanceDocument { projectId: string; - transactionId: string; - timestamp: number; - duration: number; name: string; - catcherVersion: string; + transactions: Transaction[]; + aggregatedData?: AggregatedTransaction[]; + createdAt: Date; } -interface PerformanceSpansDocument extends AggregatedSpan { +/** + * Interface for aggregated transaction data + */ +interface AggregatedTransaction { + name: string; + minStartTime: number; + maxEndTime: number; + p50duration: number; + p95duration: number; + maxDuration: number; + failureRate: number; +} + +interface PerformanceSpansDocument extends AggregatedTransaction { projectId: string; transactionId: string; timestamp: number; } export type { - AggregatedSpan, Transaction, + AggregatedSpan, PerformanceRecord, PerformancePayload, PerformanceDocument, + AggregatedTransaction, PerformanceSpansDocument }; \ No newline at end of file