1515
1616import traceback
1717from abc import abstractmethod
18- from typing import Union
18+ from typing import List , Optional , Union
1919
2020from sqlalchemy import Column
2121
22- from metadata .data_quality .validations .base_test_handler import BaseTestValidator
22+ from metadata .data_quality .validations .base_test_handler import (
23+ BaseTestValidator ,
24+ DimensionInfo ,
25+ DimensionResult ,
26+ TestEvaluation ,
27+ )
2328from metadata .generated .schema .tests .basic import (
2429 TestCaseResult ,
2530 TestCaseStatus ,
3136
3237logger = test_suite_logger ()
3338
34- MAX = "max"
35-
3639
3740class BaseColumnValueMaxToBeBetweenValidator (BaseTestValidator ):
3841 """Validator for column value max to be between test case"""
@@ -46,9 +49,16 @@ def _run_validation(self) -> TestCaseResult:
4649 Returns:
4750 TestCaseResult: The test case result for the overall validation
4851 """
52+ test_params = self ._get_test_parameters ()
53+
4954 try :
5055 column : Union [SQALikeColumn , Column ] = self ._get_column_name ()
51- res = self ._run_results (Metrics .MAX , column )
56+ max_value = self ._run_results (Metrics .MAX , column )
57+
58+ metric_values = {
59+ Metrics .MAX .name : max_value ,
60+ }
61+
5262 except (ValueError , RuntimeError ) as exc :
5363 msg = f"Error computing { self .test_case .fullyQualifiedName } : { exc } " # type: ignore
5464 logger .debug (traceback .format_exc ())
@@ -57,25 +67,210 @@ def _run_validation(self) -> TestCaseResult:
5767 self .execution_date ,
5868 TestCaseStatus .Aborted ,
5969 msg ,
60- [TestResultValue (name = MAX , value = None )],
70+ [TestResultValue (name = Metrics . MAX . name , value = None )],
6171 )
6272
63- min_bound = self .get_min_bound ("minValueForMaxInCol" )
64- max_bound = self .get_max_bound ("maxValueForMaxInCol" )
73+ evaluation = self ._evaluate_test_condition (metric_values , test_params )
74+ result_message = self ._format_result_message (
75+ metric_values , test_params = test_params
76+ )
77+ test_result_values = self ._get_test_result_values (metric_values )
6578
6679 return self .get_test_case_result_object (
6780 self .execution_date ,
68- self .get_test_case_status (min_bound <= res <= max_bound ),
69- f"Found max= { res } vs. the expected min= { min_bound } , max= { max_bound } ." ,
70- [ TestResultValue ( name = MAX , value = str ( res ))] ,
71- min_bound = min_bound ,
72- max_bound = max_bound ,
81+ self .get_test_case_status (evaluation [ "matched" ] ),
82+ result_message ,
83+ test_result_values ,
84+ min_bound = test_params [ "minValueForMaxInCol" ] ,
85+ max_bound = test_params [ "maxValueForMaxInCol" ] ,
7386 )
7487
88+ def _run_dimensional_validation (self ) -> List [DimensionResult ]:
89+ """Execute dimensional validation for column value max to be between
90+
91+ The new approach runs separate queries for each dimension column instead of
92+ combining them with GROUP BY. For example, if dimensionColumns = ["region", "category"],
93+ this method will:
94+ 1. Run one query: GROUP BY region -> {"North America": result1, "Europe": result2}
95+ 2. Run another query: GROUP BY category -> {"Electronics": result3, "Clothing": result4}
96+
97+ Returns:
98+ List[DimensionResult]: List of dimension-specific test results
99+ """
100+ try :
101+ dimension_columns = self .test_case .dimensionColumns or []
102+ if not dimension_columns :
103+ return []
104+
105+ column : Union [SQALikeColumn , Column ] = self ._get_column_name ()
106+
107+ test_params = self ._get_test_parameters ()
108+ metrics_to_compute = self ._get_metrics_to_compute (test_params )
109+
110+ dimension_results = []
111+ for dimension_column in dimension_columns :
112+ try :
113+ dimension_col = self ._get_column_name (dimension_column )
114+
115+ single_dimension_results = self ._execute_dimensional_validation (
116+ column , dimension_col , metrics_to_compute , test_params
117+ )
118+
119+ dimension_results .extend (single_dimension_results )
120+
121+ except Exception as exc :
122+ logger .warning (
123+ f"Error executing dimensional query for column { dimension_column } : { exc } "
124+ )
125+ logger .debug (traceback .format_exc ())
126+ continue
127+
128+ return dimension_results
129+
130+ except Exception as exc :
131+ logger .warning (f"Error executing dimensional validation: { exc } " )
132+ logger .debug (traceback .format_exc ())
133+ return []
134+
135+ def _get_test_parameters (self ) -> dict :
136+ """Get test parameters for this validator
137+
138+ Returns:
139+ dict: Test parameters including min and max bounds
140+ """
141+ return {
142+ "minValueForMaxInCol" : self .get_min_bound ("minValueForMaxInCol" ),
143+ "maxValueForMaxInCol" : self .get_max_bound ("maxValueForMaxInCol" ),
144+ }
145+
146+ def _get_metrics_to_compute (self , test_params : Optional [dict ] = None ) -> dict :
147+ """Get metrics that need to be computed for this test
148+
149+ Args:
150+ test_params: Optional test parameters (unused for max validator)
151+
152+ Returns:
153+ dict: Dictionary mapping metric names to Metrics enum values
154+ """
155+ return {
156+ Metrics .MAX .name : Metrics .MAX ,
157+ }
158+
159+ def _evaluate_test_condition (
160+ self , metric_values : dict , test_params : dict
161+ ) -> TestEvaluation :
162+ """Evaluate the max-to-be-between test condition
163+
164+ For max test, the condition passes if the max value is within the specified bounds.
165+ Since this is a statistical validator (group-level), passed/failed row counts are not applicable.
166+
167+ Args:
168+ metric_values: Dictionary with keys from Metrics enum names
169+ e.g., {"MAX": 42.5}
170+ test_params: Dictionary with 'minValueForMaxInCol' and 'maxValueForMaxInCol'
171+
172+ Returns:
173+ dict with keys:
174+ - matched: bool - whether test passed (max within bounds)
175+ - passed_rows: None - not applicable for statistical validators
176+ - failed_rows: None - not applicable for statistical validators
177+ - total_rows: None - not applicable for statistical validators
178+ """
179+ max_value = metric_values [Metrics .MAX .name ]
180+ min_bound = test_params ["minValueForMaxInCol" ]
181+ max_bound = test_params ["maxValueForMaxInCol" ]
182+
183+ matched = min_bound <= max_value <= max_bound
184+
185+ return {
186+ "matched" : matched ,
187+ "passed_rows" : None ,
188+ "failed_rows" : None ,
189+ "total_rows" : None ,
190+ }
191+
192+ def _format_result_message (
193+ self ,
194+ metric_values : dict ,
195+ dimension_info : Optional [DimensionInfo ] = None ,
196+ test_params : Optional [dict ] = None ,
197+ ) -> str :
198+ """Format the result message for max-to-be-between test
199+
200+ Args:
201+ metric_values: Dictionary with Metrics enum names as keys
202+ dimension_info: Optional DimensionInfo with dimension details
203+ test_params: Test parameters with min/max bounds. Required for this test.
204+
205+ Returns:
206+ str: Formatted result message
207+ """
208+ if test_params is None :
209+ raise ValueError (
210+ "test_params is required for columnValueMaxToBeBetween._format_result_message"
211+ )
212+
213+ max_value = metric_values [Metrics .MAX .name ]
214+ min_bound = test_params ["minValueForMaxInCol" ]
215+ max_bound = test_params ["maxValueForMaxInCol" ]
216+
217+ if dimension_info :
218+ return (
219+ f"Dimension { dimension_info ['dimension_name' ]} ={ dimension_info ['dimension_value' ]} : "
220+ f"Found max={ max_value } vs. the expected min={ min_bound } , max={ max_bound } "
221+ )
222+ else :
223+ return f"Found max={ max_value } vs. the expected min={ min_bound } , max={ max_bound } ."
224+
225+ def _get_test_result_values (self , metric_values : dict ) -> List [TestResultValue ]:
226+ """Get test result values for max-to-be-between test
227+
228+ Args:
229+ metric_values: Dictionary with Metrics enum names as keys
230+
231+ Returns:
232+ List[TestResultValue]: Test result values for the test case
233+ """
234+ return [
235+ TestResultValue (
236+ name = Metrics .MAX .name ,
237+ value = str (metric_values [Metrics .MAX .name ]),
238+ ),
239+ ]
240+
75241 @abstractmethod
76- def _get_column_name (self ):
242+ def _get_column_name (self , column_name : Optional [str ] = None ):
243+ """Get column object from entity link or column name
244+
245+ Args:
246+ column_name: Optional column name. If None, returns the test case column.
247+
248+ Returns:
249+ Column object
250+ """
77251 raise NotImplementedError
78252
79253 @abstractmethod
80254 def _run_results (self , metric : Metrics , column : Union [SQALikeColumn , Column ]):
81255 raise NotImplementedError
256+
257+ @abstractmethod
258+ def _execute_dimensional_validation (
259+ self ,
260+ column : Union [SQALikeColumn , Column ],
261+ dimension_col : Union [SQALikeColumn , Column ],
262+ metrics_to_compute : dict ,
263+ test_params : dict ,
264+ ) -> List [DimensionResult ]:
265+ """Execute dimensional validation query for a single dimension column
266+
267+ Args:
268+ column: The column being tested (e.g., revenue)
269+ dimension_col: The dimension column to group by (e.g., region)
270+ metrics_to_compute: Dict mapping metric names to Metrics enum values
271+ test_params: Test parameters including min and max bounds
272+
273+ Returns:
274+ List of DimensionResult objects for each dimension value
275+ """
276+ raise NotImplementedError
0 commit comments