diff --git a/prometheus_api_client/prometheus_connect.py b/prometheus_api_client/prometheus_connect.py index cd24cb4..444777a 100644 --- a/prometheus_api_client/prometheus_connect.py +++ b/prometheus_api_client/prometheus_connect.py @@ -1,4 +1,5 @@ """A Class for collection of metrics from a Prometheus Host.""" + from urllib.parse import urlparse import bz2 import os @@ -41,7 +42,7 @@ class PrometheusConnect: Example: {"http_proxy": "", "https_proxy": ""} :param session (Optional) Custom requests.Session to enable complex HTTP configuration :param timeout: (Optional) A timeout (in seconds) applied to all requests - :param method: (Optional) (str) HTTP Method (GET or POST) to use for Query APIs that allow POST + :param method: (Optional) (str) HTTP Method (GET or POST) to use for Query APIs that allow POST (/query, /query_range and /labels). Use POST for large and complex queries. Default is GET. """ @@ -55,7 +56,7 @@ def __init__( proxy: dict = None, session: Session = None, timeout: int = None, - method: str = "GET" + method: str = "GET", ): """Functions as a Constructor for the class PrometheusConnect.""" if url is None: @@ -69,7 +70,7 @@ def __init__( if not isinstance(method, str): raise TypeError("Method must be a string") - + method = method.upper() if method not in {"GET", "POST"}: raise ValueError("Method can only be GET or POST") @@ -130,7 +131,6 @@ def all_metrics(self, params: dict = None): self._all_metrics = self.get_label_values(label_name="__name__", params=params) return self._all_metrics - def get_series(self, start: datetime, end: datetime, params: dict = None): """ Get a list series happening between start and end times. @@ -165,7 +165,6 @@ def get_series(self, start: datetime, end: datetime, params: dict = None): ) return labels - def get_label_names(self, params: dict = None): """ Get a list of all labels. @@ -480,7 +479,13 @@ def custom_query(self, query: str, params: dict = None, timeout: int = None): return data def custom_query_range( - self, query: str, start_time: datetime, end_time: datetime, step: str, params: dict = None, timeout: int = None + self, + query: str, + start_time: datetime, + end_time: datetime, + step: str, + params: dict = None, + timeout: int = None, ): """ Send a query_range to a Prometheus Host. @@ -597,41 +602,40 @@ def get_metric_aggregation( np_array = numpy.array(query_values) for operation in operations: if operation == "sum": - aggregated_values["sum"] = numpy.sum(np_array) + aggregated_values["sum"] = numpy.nansum(np_array) elif operation == "max": - aggregated_values["max"] = numpy.max(np_array) + aggregated_values["max"] = numpy.nanmax(np_array) elif operation == "min": - aggregated_values["min"] = numpy.min(np_array) + aggregated_values["min"] = numpy.nanmin(np_array) elif operation == "average": - aggregated_values["average"] = numpy.average(np_array) + aggregated_values["average"] = numpy.nanmean(np_array) elif operation.startswith("percentile"): percentile = float(operation.split("_")[1]) - aggregated_values["percentile_" + str(percentile)] = numpy.percentile( - query_values, percentile + aggregated_values["percentile_" + str(percentile)] = numpy.nanpercentile( + np_array, percentile ) elif operation == "deviation": - aggregated_values["deviation"] = numpy.std(np_array) + aggregated_values["deviation"] = numpy.nanstd(np_array) elif operation == "variance": - aggregated_values["variance"] = numpy.var(np_array) + aggregated_values["variance"] = numpy.nanvar(np_array) else: raise TypeError("Invalid operation: " + operation) return aggregated_values - def get_scrape_pools(self) -> list[str]: """ Get a list of all scrape pools in activeTargets. """ scrape_pools = [] - for target in self.get_targets()['activeTargets']: - scrape_pools.append(target['scrapePool']) + for target in self.get_targets()["activeTargets"]: + scrape_pools.append(target["scrapePool"]) return list(set(scrape_pools)) def get_targets(self, state: str = None, scrape_pool: str = None): """ Get a list of all targets from Prometheus. - :param state: (str) Optional filter for target state ('active', 'dropped', 'any'). + :param state: (str) Optional filter for target state ('active', 'dropped', 'any'). If None, returns both active and dropped targets. :param scrape_pool: (str) Optional filter by scrape pool name :returns: (dict) A dictionary containing active and dropped targets @@ -641,9 +645,9 @@ def get_targets(self, state: str = None, scrape_pool: str = None): """ params = {} if state: - params['state'] = state + params["state"] = state if scrape_pool: - params['scrapePool'] = scrape_pool + params["scrapePool"] = scrape_pool response = self._session.request( method="GET", @@ -660,8 +664,7 @@ def get_targets(self, state: str = None, scrape_pool: str = None): return response.json()["data"] else: raise PrometheusApiClientException( - "HTTP Status Code {} ({!r})".format( - response.status_code, response.content) + "HTTP Status Code {} ({!r})".format(response.status_code, response.content) ) def get_target_metadata(self, target: dict[str, str], metric: str = None): @@ -679,12 +682,11 @@ def get_target_metadata(self, target: dict[str, str], metric: str = None): # Convert target dict to label selector string if metric: - params['metric'] = metric + params["metric"] = metric if target: - match_target = "{" + \ - ",".join(f'{k}="{v}"' for k, v in target.items()) + "}" - params['match_target'] = match_target + match_target = "{" + ",".join(f'{k}="{v}"' for k, v in target.items()) + "}" + params["match_target"] = match_target response = self._session.request( method="GET", @@ -701,8 +703,7 @@ def get_target_metadata(self, target: dict[str, str], metric: str = None): return response.json()["data"] else: raise PrometheusApiClientException( - "HTTP Status Code {} ({!r})".format( - response.status_code, response.content) + "HTTP Status Code {} ({!r})".format(response.status_code, response.content) ) def get_metric_metadata(self, metric: str, limit: int = None, limit_per_metric: int = None): @@ -721,13 +722,13 @@ def get_metric_metadata(self, metric: str, limit: int = None, limit_per_metric: params = {} if metric: - params['metric'] = metric + params["metric"] = metric if limit: - params['limit'] = limit + params["limit"] = limit if limit_per_metric: - params['limit_per_metric'] = limit_per_metric + params["limit_per_metric"] = limit_per_metric response = self._session.request( method="GET", @@ -745,15 +746,16 @@ def get_metric_metadata(self, metric: str, limit: int = None, limit_per_metric: formatted_data = [] for k, v in data.items(): for v_ in v: - formatted_data.append({ - "metric_name": k, - "type": v_.get('type', 'unknown'), - "help": v_.get('help', ''), - "unit": v_.get('unit', '') - }) + formatted_data.append( + { + "metric_name": k, + "type": v_.get("type", "unknown"), + "help": v_.get("help", ""), + "unit": v_.get("unit", ""), + } + ) return formatted_data else: raise PrometheusApiClientException( - "HTTP Status Code {} ({!r})".format( - response.status_code, response.content) + "HTTP Status Code {} ({!r})".format(response.status_code, response.content) ) diff --git a/tests/test_prometheus_connect.py b/tests/test_prometheus_connect.py index d2cf9a4..40a2753 100644 --- a/tests/test_prometheus_connect.py +++ b/tests/test_prometheus_connect.py @@ -1,4 +1,6 @@ """Test module for class PrometheusConnect.""" + +import math import unittest import os from datetime import datetime, timedelta @@ -127,6 +129,7 @@ def test_get_metric_aggregation(self): # noqa D102 def test_get_metric_aggregation_with_incorrect_input_types(self): # noqa D102 with self.assertRaises(TypeError, msg="operations accepted invalid value type"): _ = self.pc.get_metric_aggregation(query="up", operations="sum") + def test_retry_on_error(self): # noqa D102 retry = Retry(total=3, backoff_factor=0.1, status_forcelist=[400]) pc = PrometheusConnect(url=self.prometheus_host, disable_ssl=False, retry=retry) @@ -156,41 +159,38 @@ def test_get_scrape_pools(self): # noqa D102 self.assertTrue(len(scrape_pools) > 0, "no scrape pools found") self.assertIsInstance(scrape_pools[0], str) - def test_get_targets(self): # PR #295 + def test_get_targets(self): # PR #295 targets = self.pc.get_targets() self.assertIsInstance(targets, dict) - self.assertIn('activeTargets', targets) - self.assertIsInstance(targets['activeTargets'], list) + self.assertIn("activeTargets", targets) + self.assertIsInstance(targets["activeTargets"], list) # Test with state filter - active_targets = self.pc.get_targets(state='active') + active_targets = self.pc.get_targets(state="active") self.assertIsInstance(active_targets, dict) - self.assertIn('activeTargets', active_targets) + self.assertIn("activeTargets", active_targets) # Test with scrape_pool filter if len(scrape_pools := self.pc.get_scrape_pools()) > 0: pool_targets = self.pc.get_targets(scrape_pool=scrape_pools[0]) self.assertIsInstance(pool_targets, dict) - def test_get_target_metadata(self): # PR #295 + def test_get_target_metadata(self): # PR #295 # Get a target to test with targets = self.pc.get_targets() - if len(targets['activeTargets']) > 0: - target = { - 'job': targets['activeTargets'][0]['labels']['job'] - } + if len(targets["activeTargets"]) > 0: + target = {"job": targets["activeTargets"][0]["labels"]["job"]} metadata = self.pc.get_target_metadata(target) self.assertIsInstance(metadata, list) # Test with metric filter if len(metadata) > 0: - metric_name = metadata[0]['metric'] - filtered_metadata = self.pc.get_target_metadata( - target, metric=metric_name) + metric_name = metadata[0]["metric"] + filtered_metadata = self.pc.get_target_metadata(target, metric=metric_name) self.assertIsInstance(filtered_metadata, list) self.assertTrue( - all(item['target']['job'] == target['job'] for item in filtered_metadata)) - + all(item["target"]["job"] == target["job"] for item in filtered_metadata) + ) def test_get_metric_metadata(self): # PR #295 metadata = self.pc.get_metric_metadata(metric=None) @@ -198,18 +198,17 @@ def test_get_metric_metadata(self): # PR #295 self.assertTrue(len(metadata) > 0, "no metric metadata found") # Check structure of metadata - self.assertIn('metric_name', metadata[0]) - self.assertIn('type', metadata[0]) - self.assertIn('help', metadata[0]) - self.assertIn('unit', metadata[0]) + self.assertIn("metric_name", metadata[0]) + self.assertIn("type", metadata[0]) + self.assertIn("help", metadata[0]) + self.assertIn("unit", metadata[0]) # Test with specific metric if len(metadata) > 0: - metric_name = metadata[0]['metric_name'] + metric_name = metadata[0]["metric_name"] filtered_metadata = self.pc.get_metric_metadata(metric=metric_name) self.assertIsInstance(filtered_metadata, list) - self.assertTrue( - all(item['metric_name'] == metric_name for item in filtered_metadata)) + self.assertTrue(all(item["metric_name"] == metric_name for item in filtered_metadata)) # Test with limit limited_metadata = self.pc.get_metric_metadata(metric_name, limit=1) @@ -236,7 +235,7 @@ def test_method_argument_accepts_get_and_post(self): # Invalid value with self.assertRaises(ValueError): PrometheusConnect(url=self.prometheus_host, disable_ssl=False, method="PUT") - + def test_post_method_for_supported_functions(self): """Test that PrometheusConnect uses POST for supported endpoints when method='POST', and returns a value.""" pc = PrometheusConnect(url=self.prometheus_host, disable_ssl=False, method="POST") @@ -246,35 +245,47 @@ def test_post_method_for_supported_functions(self): # custom_query should use POST and return something (or raise) try: result = pc.custom_query("up") - self.assertTrue(result is not None and result != [], "no metrics received from prometheus") + self.assertTrue( + result is not None and result != [], "no metrics received from prometheus" + ) except Exception as e: self.fail(f"custom_query('up') raised an unexpected exception: {e}") # custom_query_range should use POST and return something (or raise) try: - result = pc.custom_query_range("up", start_time=start_time, end_time=end_time, step="15") - self.assertTrue(result is not None and result != [], "no metrics received from prometheus") + result = pc.custom_query_range( + "up", start_time=start_time, end_time=end_time, step="15" + ) + self.assertTrue( + result is not None and result != [], "no metrics received from prometheus" + ) except Exception as e: self.fail(f"custom_query_range('up', ...) raised an unexpected exception: {e}") # get_label_names should use POST and return something (or raise) try: result = pc.get_label_names() - self.assertTrue(result is not None and result != [], "no metrics received from prometheus") + self.assertTrue( + result is not None and result != [], "no metrics received from prometheus" + ) except Exception as e: self.fail(f"get_label_names() raised an unexpected exception: {e}") # get_current_metric_value should use POST and return something (or raise) try: result = pc.get_current_metric_value("up") - self.assertTrue(result is not None and result != [], "no metrics received from prometheus") + self.assertTrue( + result is not None and result != [], "no metrics received from prometheus" + ) except Exception as e: self.fail(f"get_current_metric_value('up') raised an unexpected exception: {e}") # get_metric_range_data should use POST and return something (or raise) try: result = pc.get_metric_range_data("up", start_time=start_time, end_time=end_time) - self.assertTrue(result is not None and result != [], "no metrics received from prometheus") + self.assertTrue( + result is not None and result != [], "no metrics received from prometheus" + ) except Exception as e: self.fail(f"get_metric_range_data('up', ...) raised an unexpected exception: {e}") @@ -358,12 +369,14 @@ def test_all_metrics_method(self): # noqa D102 request = handler.requests[0] self.assertEqual(request.path_url, "/api/v1/label/__name__/values") - def test_get_series_method(self): # noqa D102 - series_payload = {"status": "success", "data": [ - {"__name__": "up", "job": "prometheus", "instance": "localhost:9090"}, - {"__name__": "up", "job": "node", "instance": "localhost:9100"} - ]} + series_payload = { + "status": "success", + "data": [ + {"__name__": "up", "job": "prometheus", "instance": "localhost:9090"}, + {"__name__": "up", "job": "node", "instance": "localhost:9100"}, + ], + } with self.mock_response(series_payload) as handler: start_time = datetime.now() - timedelta(hours=1) @@ -395,5 +408,60 @@ def test_get_label_values_method(self): # noqa D102 request = handler.requests[0] self.assertEqual(request.path_url, "/api/v1/label/label_name/values") + def test_get_metric_aggregation_with_nan_values(self): # noqa D102 + """Test that aggregation functions handle NaN values correctly.""" + # Mock response with NaN values in the data + query_response_with_nan = { + "status": "success", + "data": { + "result": [ + {"metric": {"__name__": "test_metric"}, "value": [1638360000, "1.0"]}, + {"metric": {"__name__": "test_metric"}, "value": [1638360015, "NaN"]}, + {"metric": {"__name__": "test_metric"}, "value": [1638360030, "2.0"]}, + {"metric": {"__name__": "test_metric"}, "value": [1638360045, "3.0"]}, + ] + }, + } + + operations = ["sum", "max", "min", "variance", "percentile_50", "deviation", "average"] + + with self.mock_response(query_response_with_nan): + aggregated_values = self.pc.get_metric_aggregation( + query="test_metric", operations=operations + ) + + # With NaN-handling functions, we should get valid results + # sum should be 6.0 (1 + 2 + 3, ignoring NaN) + self.assertIsNotNone(aggregated_values) + self.assertIn("sum", aggregated_values) + self.assertIn("max", aggregated_values) + self.assertIn("min", aggregated_values) + self.assertIn("average", aggregated_values) + self.assertIn("variance", aggregated_values) + self.assertIn("deviation", aggregated_values) + self.assertIn("percentile_50.0", aggregated_values) + + # Verify that results are not NaN + self.assertFalse(math.isnan(aggregated_values["sum"]), "Sum should not be NaN") + self.assertFalse(math.isnan(aggregated_values["max"]), "Max should not be NaN") + self.assertFalse(math.isnan(aggregated_values["min"]), "Min should not be NaN") + self.assertFalse(math.isnan(aggregated_values["average"]), "Average should not be NaN") + self.assertFalse( + math.isnan(aggregated_values["variance"]), "Variance should not be NaN" + ) + self.assertFalse( + math.isnan(aggregated_values["deviation"]), "Deviation should not be NaN" + ) + self.assertFalse( + math.isnan(aggregated_values["percentile_50.0"]), "Percentile should not be NaN" + ) + + # Verify expected values (approximately) + self.assertAlmostEqual(aggregated_values["sum"], 6.0, places=5) + self.assertAlmostEqual(aggregated_values["max"], 3.0, places=5) + self.assertAlmostEqual(aggregated_values["min"], 1.0, places=5) + self.assertAlmostEqual(aggregated_values["average"], 2.0, places=5) + + if __name__ == "__main__": - unittest.main() \ No newline at end of file + unittest.main()