From e62415ad03ad02345bf4b5fdd2ebafef07860e61 Mon Sep 17 00:00:00 2001 From: Peter Tsrunchev Date: Fri, 12 Dec 2025 19:45:31 +0000 Subject: [PATCH] fix(metrics): usage performance optimization --- .../common/metric_calculator/mongo.py | 93 ++++++++----------- tests/unit/test_metric_mongo_usage.py | 20 ++-- 2 files changed, 55 insertions(+), 58 deletions(-) diff --git a/polytope_server/common/metric_calculator/mongo.py b/polytope_server/common/metric_calculator/mongo.py index 977e5f6..c0fda7e 100644 --- a/polytope_server/common/metric_calculator/mongo.py +++ b/polytope_server/common/metric_calculator/mongo.py @@ -19,6 +19,7 @@ # import logging +import time from typing import Any, Dict, List, Optional, Sequence, Tuple from pymongo import ASCENDING, DESCENDING @@ -245,64 +246,52 @@ def get_usage_metrics_aggregated(self, cutoff_timestamps: Dict[str, float]) -> D logger.debug("Aggregating usage metrics for cutoffs %s", cutoff_timestamps) - # Prepare group stages for counting - # We want to count total, and for each window - requests_counts = { - "_id": None, - "total": {"$sum": 1}, - } - users_counts = { - "_id": None, - "total": {"$sum": 1}, - } - - for name, cutoff in cutoff_timestamps.items(): - req_cond = {"$cond": [{"$gte": ["$timestamp", cutoff]}, 1, 0]} - requests_counts[name] = {"$sum": req_cond} - - user_cond = {"$cond": [{"$gte": ["$ts", cutoff]}, 1, 0]} - users_counts[name] = {"$sum": user_cond} - - facets = { - "requests": [ - {"$group": requests_counts}, - ], - "users": [ - {"$match": {"user_id": {"$exists": True, "$type": "string"}}}, - {"$group": {"_id": "$user_id", "ts": {"$max": "$timestamp"}}}, - {"$group": users_counts}, - ], + base_filter = { + "type": "request_status_change", + "status": "processed", } - pipeline = [ - { - "$match": { - "type": "request_status_change", - "status": "processed", - } - }, - {"$facet": facets}, + # Total number of requests + # Assume no duplicates + t0 = time.time() + total_requests = self.metric_collection.count_documents(base_filter) + logger.debug("Total requests count took %.4fs", time.time() - t0) + + # Total number of unique users + t0 = time.time() + user_pipeline = [ + {"$match": base_filter}, + {"$group": {"_id": "$user_id"}}, + {"$count": "total"}, ] + user_res = list(self.metric_collection.aggregate(user_pipeline, allowDiskUse=True)) + unique_users = user_res[0]["total"] if user_res else 0 + logger.debug("Total unique users count took %.4fs", time.time() - t0) - assert self.metric_collection is not None - result_list = list(self.metric_collection.aggregate(pipeline)) - facet_results = result_list[0] if result_list else {} - - # Extract results - req_res = facet_results.get("requests", []) - req_data = req_res[0] if req_res else {} - - user_res = facet_results.get("users", []) - user_data = user_res[0] if user_res else {} + timeframe_metrics = {} - total_requests = req_data.get("total", 0) - unique_users = user_data.get("total", 0) + # Per-window metrics + for name, cutoff in cutoff_timestamps.items(): + window_filter = base_filter.copy() + window_filter["timestamp"] = {"$gte": cutoff} + + t0 = time.time() + req_count = self.metric_collection.count_documents(window_filter) + logger.debug("Request count for window %s took %.4fs", name, time.time() - t0) + + t0 = time.time() + window_user_pipeline = [ + {"$match": window_filter}, + {"$group": {"_id": "$user_id"}}, + {"$count": "total"}, + ] + window_user_res = list(self.metric_collection.aggregate(window_user_pipeline, allowDiskUse=True)) + user_count = window_user_res[0]["total"] if window_user_res else 0 + logger.debug("User count for window %s took %.4fs", name, time.time() - t0) - timeframe_metrics = {} - for framename in cutoff_timestamps.keys(): - timeframe_metrics[framename] = { - "requests": req_data.get(framename, 0), - "unique_users": user_data.get(framename, 0), + timeframe_metrics[name] = { + "requests": req_count, + "unique_users": user_count, } result = { diff --git a/tests/unit/test_metric_mongo_usage.py b/tests/unit/test_metric_mongo_usage.py index e3255c9..43a3453 100644 --- a/tests/unit/test_metric_mongo_usage.py +++ b/tests/unit/test_metric_mongo_usage.py @@ -97,6 +97,14 @@ def test_get_usage_metrics_aggregated_basic() -> None: "request_id": "r_bad2", "user_id": "u1", }, + # duplicate processed for same request (should not happen, but test anyway) + { + "type": "request_status_change", + "status": "processed", + "timestamp": 12000, + "request_id": "r1", + "user_id": "u1", + }, ] metric_coll.insert_many(docs) @@ -110,17 +118,17 @@ def test_get_usage_metrics_aggregated_basic() -> None: res = calc.get_usage_metrics_aggregated(cutoffs) - # Total requests: 6 valid processed requests - assert res["total_requests"] == 6 + # Total requests: 7 valid processed requests (including duplicate) + assert res["total_requests"] == 7 # Unique users: u1, u2, u3, u4 -> 4 assert res["unique_users"] == 4 tf = res["timeframe_metrics"] - # last_1h (>= 10000): r1, r2, r3 -> 3 requests. u1, u2 -> 2 users. - assert tf["last_1h"]["requests"] == 3 + # last_1h (>= 10000): r1, r2, r3, r1(dup) -> 4 requests. u1, u2 -> 2 users. + assert tf["last_1h"]["requests"] == 4 assert tf["last_1h"]["unique_users"] == 2 - # last_24h (>= 5000): r1..r5 -> 5 requests. u1, u2, u3 -> 3 users. - assert tf["last_24h"]["requests"] == 5 + # last_24h (>= 5000): r1..r5 + r1(dup) -> 6 requests. u1, u2, u3 -> 3 users. + assert tf["last_24h"]["requests"] == 6 assert tf["last_24h"]["unique_users"] == 3