From 61651b56f67c26877b8fd84a6ac0ccebbd5a258f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 4 Dec 2025 03:59:01 +0000 Subject: [PATCH 1/3] Initial plan From 91b9e763c0f0f2f5fb45c136d04812948200c17d Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 4 Dec 2025 04:04:22 +0000 Subject: [PATCH 2/3] Add comprehensive test coverage for PrometheusConnect methods Co-authored-by: 4n4nd <22333506+4n4nd@users.noreply.github.com> --- tests/test_prometheus_connect.py | 561 +++++++++++++++++++++++++++++++ 1 file changed, 561 insertions(+) diff --git a/tests/test_prometheus_connect.py b/tests/test_prometheus_connect.py index d2cf9a4..ffb193b 100644 --- a/tests/test_prometheus_connect.py +++ b/tests/test_prometheus_connect.py @@ -395,5 +395,566 @@ def test_get_label_values_method(self): # noqa D102 request = handler.requests[0] self.assertEqual(request.path_url, "/api/v1/label/label_name/values") + def test_check_prometheus_connection(self): # noqa D102 + # Test successful connection + with self.mock_response("OK", status_code=200) as handler: + result = self.pc.check_prometheus_connection() + self.assertTrue(result) + self.assertEqual(handler.call_count, 1) + request = handler.requests[0] + self.assertEqual(request.path_url, "/") + + # Test failed connection + with self.mock_response("Not Found", status_code=404): + result = self.pc.check_prometheus_connection() + self.assertFalse(result) + + # Test with parameters + with self.mock_response("OK", status_code=200) as handler: + result = self.pc.check_prometheus_connection(params={"test": "value"}) + self.assertTrue(result) + self.assertIn("test=value", handler.requests[0].url) + + def test_get_targets(self): # noqa D102 + targets_payload = { + "status": "success", + "data": { + "activeTargets": [ + { + "discoveredLabels": {"__address__": "localhost:9090"}, + "labels": {"job": "prometheus", "instance": "localhost:9090"}, + "scrapePool": "prometheus", + "scrapeUrl": "http://localhost:9090/metrics", + "health": "up", + "lastError": "", + "lastScrape": "2024-01-01T00:00:00.000Z", + "lastScrapeDuration": 0.001, + } + ], + "droppedTargets": [], + }, + } + + with self.mock_response(targets_payload) as handler: + result = self.pc.get_targets() + self.assertIsInstance(result, dict) + self.assertIn("activeTargets", result) + self.assertEqual(len(result["activeTargets"]), 1) + self.assertEqual(handler.call_count, 1) + request = handler.requests[0] + self.assertEqual(request.path_url, "/api/v1/targets") + + # Test with state filter + with self.mock_response(targets_payload) as handler: + result = self.pc.get_targets(state="active") + self.assertIsInstance(result, dict) + self.assertIn("state=active", handler.requests[0].url) + + # Test with scrape_pool filter + with self.mock_response(targets_payload) as handler: + result = self.pc.get_targets(scrape_pool="prometheus") + self.assertIsInstance(result, dict) + self.assertIn("scrapePool=prometheus", handler.requests[0].url) + + # Test error handling + with self.mock_response("Internal Server Error", status_code=500): + with self.assertRaises(PrometheusApiClientException) as exc: + self.pc.get_targets() + self.assertIn("HTTP Status Code 500", str(exc.exception)) + + def test_get_scrape_pools(self): # noqa D102 + targets_payload = { + "status": "success", + "data": { + "activeTargets": [ + { + "scrapePool": "prometheus", + "labels": {"job": "prometheus"}, + }, + { + "scrapePool": "node-exporter", + "labels": {"job": "node"}, + }, + { + "scrapePool": "prometheus", + "labels": {"job": "prometheus"}, + }, + ], + "droppedTargets": [], + }, + } + + with self.mock_response(targets_payload): + result = self.pc.get_scrape_pools() + self.assertIsInstance(result, list) + # Should have unique scrape pools + self.assertEqual(len(result), 2) + self.assertIn("prometheus", result) + self.assertIn("node-exporter", result) + + def test_get_target_metadata(self): # noqa D102 + metadata_payload = { + "status": "success", + "data": [ + { + "target": {"job": "prometheus", "instance": "localhost:9090"}, + "metric": "up", + "type": "gauge", + "help": "The scraping target is up", + "unit": "", + }, + { + "target": {"job": "prometheus", "instance": "localhost:9090"}, + "metric": "process_cpu_seconds_total", + "type": "counter", + "help": "Total CPU time", + "unit": "seconds", + }, + ], + } + + # Test with target filter only + with self.mock_response(metadata_payload) as handler: + result = self.pc.get_target_metadata(target={"job": "prometheus"}) + self.assertIsInstance(result, list) + self.assertEqual(len(result), 2) + self.assertEqual(handler.call_count, 1) + request = handler.requests[0] + self.assertTrue(request.path_url.startswith("/api/v1/targets/metadata")) + self.assertIn('match_target=%7Bjob%3D%22prometheus%22%7D', request.url) + + # Test with target and metric filter + with self.mock_response(metadata_payload) as handler: + result = self.pc.get_target_metadata(target={"job": "prometheus"}, metric="up") + self.assertIsInstance(result, list) + self.assertIn("metric=up", handler.requests[0].url) + + # Test error handling + with self.mock_response("Internal Server Error", status_code=500): + with self.assertRaises(PrometheusApiClientException) as exc: + self.pc.get_target_metadata(target={"job": "test"}) + self.assertIn("HTTP Status Code 500", str(exc.exception)) + + def test_get_metric_metadata(self): # noqa D102 + metadata_payload = { + "status": "success", + "data": { + "up": [ + { + "type": "gauge", + "help": "The scraping target is up", + "unit": "", + } + ], + "process_cpu_seconds_total": [ + { + "type": "counter", + "help": "Total CPU time", + "unit": "seconds", + } + ], + }, + } + + # Test without filters + with self.mock_response(metadata_payload) as handler: + result = self.pc.get_metric_metadata(metric=None) + self.assertIsInstance(result, list) + self.assertEqual(len(result), 2) + self.assertEqual(handler.call_count, 1) + request = handler.requests[0] + self.assertEqual(request.path_url, "/api/v1/metadata") + # Check formatted structure + self.assertIn("metric_name", result[0]) + self.assertIn("type", result[0]) + self.assertIn("help", result[0]) + self.assertIn("unit", result[0]) + + # Test with metric filter + with self.mock_response(metadata_payload) as handler: + result = self.pc.get_metric_metadata(metric="up") + self.assertIsInstance(result, list) + self.assertIn("metric=up", handler.requests[0].url) + + # Test with limit parameter + with self.mock_response(metadata_payload) as handler: + result = self.pc.get_metric_metadata(metric="up", limit=1) + self.assertIsInstance(result, list) + self.assertIn("limit=1", handler.requests[0].url) + + # Test with limit_per_metric parameter + with self.mock_response(metadata_payload) as handler: + result = self.pc.get_metric_metadata(metric="up", limit_per_metric=1) + self.assertIsInstance(result, list) + self.assertIn("limit_per_metric=1", handler.requests[0].url) + + # Test error handling + with self.mock_response("Internal Server Error", status_code=500): + with self.assertRaises(PrometheusApiClientException) as exc: + self.pc.get_metric_metadata(metric="test") + self.assertIn("HTTP Status Code 500", str(exc.exception)) + + def test_custom_query_with_timeout(self): # noqa D102 + query_payload = { + "status": "success", + "data": { + "resultType": "vector", + "result": [ + { + "metric": {"__name__": "up", "job": "prometheus"}, + "value": [1609459200, "1"], + } + ], + }, + } + + with self.mock_response(query_payload): + # Test custom timeout override + result = self.pc.custom_query("up", timeout=30) + self.assertIsInstance(result, list) + self.assertEqual(len(result), 1) + + def test_custom_query_range_with_timeout(self): # noqa D102 + query_range_payload = { + "status": "success", + "data": { + "resultType": "matrix", + "result": [ + { + "metric": {"__name__": "up", "job": "prometheus"}, + "values": [[1609459200, "1"], [1609459260, "1"]], + } + ], + }, + } + + with self.mock_response(query_range_payload): + start_time = datetime.now() - timedelta(minutes=10) + end_time = datetime.now() + # Test custom timeout override + result = self.pc.custom_query_range("up", start_time, end_time, "60", timeout=30) + self.assertIsInstance(result, list) + self.assertEqual(len(result), 1) + + def test_get_current_metric_value_with_label_config(self): # noqa D102 + metric_payload = { + "status": "success", + "data": { + "resultType": "vector", + "result": [ + { + "metric": {"__name__": "up", "job": "prometheus", "cluster": "test"}, + "value": [1609459200, "1"], + } + ], + }, + } + + # Test with label configuration + with self.mock_response(metric_payload) as handler: + result = self.pc.get_current_metric_value( + "up", label_config={"job": "prometheus", "cluster": "test"} + ) + self.assertIsInstance(result, list) + self.assertEqual(len(result), 1) + # Verify query string contains label config + request = handler.requests[0] + self.assertIn("up%7B", request.url) # URL encoded { + + # Test without label configuration + with self.mock_response(metric_payload): + result = self.pc.get_current_metric_value("up") + self.assertIsInstance(result, list) + + def test_get_metric_range_data_edge_cases(self): # noqa D102 + metric_payload = { + "status": "success", + "data": { + "resultType": "matrix", + "result": [ + { + "metric": {"__name__": "up"}, + "values": [[1609459200, "1"]], + } + ], + }, + } + + start_time = datetime.now() - timedelta(minutes=10) + end_time = datetime.now() + + # Test with ValueError for end_time before start_time + with self.assertRaises(ValueError, msg="end_time must not be before start_time"): + self.pc.get_metric_range_data( + "up", start_time=end_time, end_time=start_time + ) + + def test_constructor_with_invalid_method(self): # noqa D102 + # Test that constructor validates method parameter + with self.assertRaises(TypeError): + PrometheusConnect(url="http://test.com", method=123) + + with self.assertRaises(ValueError): + PrometheusConnect(url="http://test.com", method="DELETE") + + # Test case-insensitive method handling + pc = PrometheusConnect(url="http://test.com", method="get") + self.assertEqual(pc._method, "GET") + + pc = PrometheusConnect(url="http://test.com", method="post") + self.assertEqual(pc._method, "POST") + + def test_constructor_with_custom_session(self): # noqa D102 + # Test with custom session + custom_session = requests.Session() + custom_session.headers.update({"X-Custom-Header": "test"}) + pc = PrometheusConnect(url="http://test.com", session=custom_session) + self.assertEqual(pc._session, custom_session) + self.assertIn("X-Custom-Header", pc._session.headers) + + def test_constructor_with_auth_and_proxy(self): # noqa D102 + # Test with auth tuple + auth_tuple = ("user", "password") + pc = PrometheusConnect(url="http://test.com", auth=auth_tuple) + self.assertEqual(pc.auth, auth_tuple) + + # Test with proxy + proxy_dict = {"http": "http://proxy:8080", "https": "https://proxy:8080"} + pc = PrometheusConnect(url="http://test.com", proxy=proxy_dict) + self.assertEqual(pc._session.proxies, proxy_dict) + + def test_constructor_with_timeout(self): # noqa D102 + # Test with timeout + pc = PrometheusConnect(url="http://test.com", timeout=60) + self.assertEqual(pc._timeout, 60) + + # Test default timeout is None + pc_default = PrometheusConnect(url="http://test.com") + self.assertIsNone(pc_default._timeout) + + def test_constructor_with_missing_url(self): # noqa D102 + # Test that url is required + with self.assertRaises(TypeError): + PrometheusConnect(url=None) + + def test_get_metric_aggregation_with_no_operations(self): # noqa D102 + # Test with empty operations list + result = self.pc.get_metric_aggregation( + query="up", operations=[], start_time=datetime.now() - timedelta(minutes=10), end_time=datetime.now() + ) + self.assertIsNone(result) + + def test_get_metric_aggregation_with_invalid_operation(self): # noqa D102 + query_payload = { + "status": "success", + "data": { + "resultType": "vector", + "result": [ + { + "metric": {"__name__": "up"}, + "value": [1609459200, "1.5"], + } + ], + }, + } + + with self.mock_response(query_payload): + with self.assertRaises(TypeError, msg="Invalid operation"): + self.pc.get_metric_aggregation(query="up", operations=["invalid_operation"]) + + def test_get_metric_aggregation_with_no_results(self): # noqa D102 + empty_query_payload = { + "status": "success", + "data": { + "resultType": "vector", + "result": [], + }, + } + + with self.mock_response(empty_query_payload): + result = self.pc.get_metric_aggregation(query="nonexistent_metric", operations=["sum"]) + self.assertIsNone(result) + + def test_get_metric_aggregation_percentile(self): # noqa D102 + query_payload = { + "status": "success", + "data": { + "resultType": "vector", + "result": [ + { + "metric": {"__name__": "up"}, + "value": [1609459200, "1.5"], + }, + { + "metric": {"__name__": "up"}, + "value": [1609459200, "2.5"], + }, + { + "metric": {"__name__": "up"}, + "value": [1609459200, "3.5"], + }, + ], + }, + } + + with self.mock_response(query_payload): + result = self.pc.get_metric_aggregation(query="up", operations=["percentile_50", "percentile_95"]) + self.assertIsInstance(result, dict) + # Note: The implementation adds ".0" to percentile keys + self.assertIn("percentile_50.0", result) + self.assertIn("percentile_95.0", result) + + def test_get_metric_range_data_with_params(self): # noqa D102 + metric_payload = { + "status": "success", + "data": { + "resultType": "matrix", + "result": [ + { + "metric": {"__name__": "up"}, + "values": [[1609459200, "1"]], + } + ], + }, + } + + start_time = datetime.now() - timedelta(minutes=10) + end_time = datetime.now() + + # Test with additional params + with self.mock_response(metric_payload) as handler: + result = self.pc.get_metric_range_data( + "up", start_time=start_time, end_time=end_time, params={"timeout": "30s"} + ) + self.assertIsInstance(result, list) + # Verify params were passed + request = handler.requests[0] + self.assertIn("timeout=30s", request.url) + + def test_custom_query_with_params(self): # noqa D102 + query_payload = { + "status": "success", + "data": { + "resultType": "vector", + "result": [ + { + "metric": {"__name__": "up"}, + "value": [1609459200, "1"], + } + ], + }, + } + + # Test with additional params + with self.mock_response(query_payload) as handler: + result = self.pc.custom_query("up", params={"time": "1609459200"}) + self.assertIsInstance(result, list) + # Verify params were passed + request = handler.requests[0] + self.assertIn("time=1609459200", request.url) + + def test_custom_query_range_with_params(self): # noqa D102 + query_range_payload = { + "status": "success", + "data": { + "resultType": "matrix", + "result": [ + { + "metric": {"__name__": "up"}, + "values": [[1609459200, "1"], [1609459260, "1"]], + } + ], + }, + } + + start_time = datetime.now() - timedelta(minutes=10) + end_time = datetime.now() + + # Test with additional params + with self.mock_response(query_range_payload) as handler: + result = self.pc.custom_query_range( + "up", start_time, end_time, "60", params={"timeout": "30s"} + ) + self.assertIsInstance(result, list) + # Verify params were passed + request = handler.requests[0] + self.assertIn("timeout=30s", request.url) + + def test_get_series_with_params(self): # noqa D102 + series_payload = { + "status": "success", + "data": [ + {"__name__": "up", "job": "prometheus"}, + ], + } + + start_time = datetime.now() - timedelta(hours=1) + end_time = datetime.now() + + # Test with match[] param + with self.mock_response(series_payload) as handler: + result = self.pc.get_series( + start=start_time, end=end_time, params={"match[]": "up"} + ) + self.assertIsInstance(result, list) + # Verify params were passed + request = handler.requests[0] + self.assertIn("match", request.url) + + def test_get_label_names_with_params(self): # noqa D102 + labels_payload = {"status": "success", "data": ["job", "instance"]} + + # Test with match[] param + with self.mock_response(labels_payload) as handler: + result = self.pc.get_label_names(params={"match[]": "up"}) + self.assertIsInstance(result, list) + # Verify params were passed + request = handler.requests[0] + self.assertIn("match", request.url) + + def test_all_metrics_with_params(self): # noqa D102 + metrics_payload = {"status": "success", "data": ["up", "alerts"]} + + # Test with time param + with self.mock_response(metrics_payload) as handler: + result = self.pc.all_metrics(params={"time": "1609459200"}) + self.assertIsInstance(result, list) + # Verify params were passed + request = handler.requests[0] + self.assertIn("time=1609459200", request.url) + + def test_get_current_metric_value_with_params(self): # noqa D102 + metric_payload = { + "status": "success", + "data": { + "resultType": "vector", + "result": [ + { + "metric": {"__name__": "up"}, + "value": [1609459200, "1"], + } + ], + }, + } + + # Test with time param + with self.mock_response(metric_payload) as handler: + result = self.pc.get_current_metric_value("up", params={"time": "1609459200"}) + self.assertIsInstance(result, list) + # Verify params were passed + request = handler.requests[0] + self.assertIn("time=1609459200", request.url) + + def test_get_label_values_with_params(self): # noqa D102 + values_payload = {"status": "success", "data": ["prometheus", "node"]} + + # Test with time param + with self.mock_response(values_payload) as handler: + result = self.pc.get_label_values("job", params={"time": "1609459200"}) + self.assertIsInstance(result, list) + # Verify params were passed + request = handler.requests[0] + self.assertIn("time=1609459200", request.url) + if __name__ == "__main__": unittest.main() \ No newline at end of file From 0cbac3ade224864d6d6166b380caad6fa079ef09 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 4 Dec 2025 04:07:16 +0000 Subject: [PATCH 3/3] Fix deprecated assertRaises msg parameter in tests Co-authored-by: 4n4nd <22333506+4n4nd@users.noreply.github.com> --- tests/test_prometheus_connect.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/test_prometheus_connect.py b/tests/test_prometheus_connect.py index ffb193b..0ead08c 100644 --- a/tests/test_prometheus_connect.py +++ b/tests/test_prometheus_connect.py @@ -93,22 +93,22 @@ def test_get_metric_range_data_with_incorrect_input_types(self): # noqa D102 chunk_size = timedelta(minutes=7) end_time = datetime.now() - timedelta(minutes=10) - with self.assertRaises(ValueError, msg="specified chunk_size is too big"): + with self.assertRaises(ValueError): _ = self.pc.get_metric_range_data( metric_name="up", start_time=start_time, end_time=end_time, chunk_size=timedelta(minutes=30), ) - with self.assertRaises(TypeError, msg="start_time accepted invalid value type"): + with self.assertRaises(TypeError): _ = self.pc.get_metric_range_data( metric_name="up", start_time="20m", end_time=end_time, chunk_size=chunk_size ) - with self.assertRaises(TypeError, msg="end_time accepted invalid value type"): + with self.assertRaises(TypeError): _ = self.pc.get_metric_range_data( metric_name="up", start_time=start_time, end_time="10m", chunk_size=chunk_size ) - with self.assertRaises(TypeError, msg="chunk_size accepted invalid value type"): + with self.assertRaises(TypeError): _ = self.pc.get_metric_range_data( metric_name="up", start_time=start_time, end_time=end_time, chunk_size="10m" ) @@ -125,7 +125,7 @@ def test_get_metric_aggregation(self): # noqa D102 self.assertTrue(len(aggregated_values) > 0, "no values received after aggregating") def test_get_metric_aggregation_with_incorrect_input_types(self): # noqa D102 - with self.assertRaises(TypeError, msg="operations accepted invalid value type"): + with self.assertRaises(TypeError): _ = self.pc.get_metric_aggregation(query="up", operations="sum") def test_retry_on_error(self): # noqa D102 retry = Retry(total=3, backoff_factor=0.1, status_forcelist=[400]) @@ -684,7 +684,7 @@ def test_get_metric_range_data_edge_cases(self): # noqa D102 end_time = datetime.now() # Test with ValueError for end_time before start_time - with self.assertRaises(ValueError, msg="end_time must not be before start_time"): + with self.assertRaises(ValueError): self.pc.get_metric_range_data( "up", start_time=end_time, end_time=start_time ) @@ -759,7 +759,7 @@ def test_get_metric_aggregation_with_invalid_operation(self): # noqa D102 } with self.mock_response(query_payload): - with self.assertRaises(TypeError, msg="Invalid operation"): + with self.assertRaises(TypeError): self.pc.get_metric_aggregation(query="up", operations=["invalid_operation"]) def test_get_metric_aggregation_with_no_results(self): # noqa D102