33
44//! Vortex table provider metrics.
55use std:: sync:: Arc ;
6+ use std:: time:: Duration ;
67
78use datafusion_datasource:: file_scan_config:: FileScanConfig ;
89use datafusion_datasource:: source:: DataSourceExec ;
@@ -15,13 +16,16 @@ use datafusion_physical_plan::metrics::Gauge;
1516use datafusion_physical_plan:: metrics:: Label as DatafusionLabel ;
1617use datafusion_physical_plan:: metrics:: MetricValue as DatafusionMetricValue ;
1718use datafusion_physical_plan:: metrics:: MetricsSet ;
19+ use datafusion_physical_plan:: metrics:: Time ;
20+ use vortex:: error:: VortexExpect ;
21+ use vortex:: metrics:: Label ;
1822use vortex:: metrics:: Metric ;
19- use vortex:: metrics:: MetricId ;
20- use vortex:: metrics:: Tags ;
23+ use vortex:: metrics:: MetricValue ;
2124
2225use crate :: persistent:: source:: VortexSource ;
2326
2427pub ( crate ) static PARTITION_LABEL : & str = "partition" ;
28+ pub ( crate ) static PATH_LABEL : & str = "file_path" ;
2529
2630/// Extracts datafusion metrics from all VortexExec instances in
2731/// a given physical plan.
@@ -43,9 +47,8 @@ impl ExecutionPlanVisitor for VortexMetricsFinder {
4347 type Error = std:: convert:: Infallible ;
4448 fn pre_visit ( & mut self , plan : & dyn ExecutionPlan ) -> Result < bool , Self :: Error > {
4549 if let Some ( exec) = plan. as_any ( ) . downcast_ref :: < DataSourceExec > ( ) {
46- if let Some ( metrics) = exec. metrics ( ) {
47- self . 0 . push ( metrics) ;
48- }
50+ // Start with exec metrics or create a new set
51+ let mut set = exec. metrics ( ) . unwrap_or_default ( ) ;
4952
5053 // Include our own metrics from VortexSource
5154 if let Some ( file_scan) = exec. data_source ( ) . as_any ( ) . downcast_ref :: < FileScanConfig > ( )
@@ -54,92 +57,99 @@ impl ExecutionPlanVisitor for VortexMetricsFinder {
5457 . as_any ( )
5558 . downcast_ref :: < VortexSource > ( )
5659 {
57- let mut set = MetricsSet :: new ( ) ;
5860 for metric in scan
59- . vx_metrics ( )
61+ . metrics_registry ( )
6062 . snapshot ( )
6163 . iter ( )
62- . flat_map ( | ( id , metric ) | metric_to_datafusion ( id , metric ) )
64+ . flat_map ( metric_to_datafusion)
6365 {
6466 set. push ( Arc :: new ( metric) ) ;
6567 }
66-
67- self . 0 . push ( set) ;
6868 }
69- }
7069
71- Ok ( true )
70+ self . 0 . push ( set) ;
71+
72+ Ok ( false )
73+ } else {
74+ Ok ( true )
75+ }
7276 }
7377}
7478
75- fn metric_to_datafusion ( id : MetricId , metric : & Metric ) -> impl Iterator < Item = DatafusionMetric > {
76- let ( partition, labels) = tags_to_datafusion ( id . tags ( ) ) ;
77- metric_value_to_datafusion ( id . name ( ) , metric)
79+ fn metric_to_datafusion ( metric : & Metric ) -> impl Iterator < Item = DatafusionMetric > {
80+ let ( partition, labels) = labels_to_datafusion ( metric . labels ( ) ) ;
81+ metric_value_to_datafusion ( metric . name ( ) , metric. value ( ) )
7882 . into_iter ( )
7983 . map ( move |metric_value| {
8084 DatafusionMetric :: new_with_labels ( metric_value, partition, labels. clone ( ) )
8185 } )
8286}
8387
84- fn tags_to_datafusion ( tags : & Tags ) -> ( Option < usize > , Vec < DatafusionLabel > ) {
88+ fn labels_to_datafusion ( tags : & [ Label ] ) -> ( Option < usize > , Vec < DatafusionLabel > ) {
8589 tags. iter ( )
86- . fold ( ( None , Vec :: new ( ) ) , |( mut partition, mut labels) , ( k , v ) | {
87- if k == PARTITION_LABEL {
88- partition = v . parse ( ) . ok ( ) ;
90+ . fold ( ( None , Vec :: new ( ) ) , |( mut partition, mut labels) , metric | {
91+ if metric . key ( ) == PARTITION_LABEL {
92+ partition = metric . value ( ) . parse ( ) . ok ( ) ;
8993 } else {
90- labels. push ( DatafusionLabel :: new ( k. to_string ( ) , v. to_string ( ) ) ) ;
94+ labels. push ( DatafusionLabel :: new (
95+ metric. key ( ) . to_string ( ) ,
96+ metric. value ( ) . to_string ( ) ,
97+ ) ) ;
9198 }
9299 ( partition, labels)
93100 } )
94101}
95102
96- fn metric_value_to_datafusion ( name : & str , metric : & Metric ) -> Vec < DatafusionMetricValue > {
103+ fn metric_value_to_datafusion ( name : & str , metric : & MetricValue ) -> Vec < DatafusionMetricValue > {
97104 match metric {
98- Metric :: Counter ( counter) => counter
99- . count ( )
105+ MetricValue :: Counter ( counter) => counter
106+ . value ( )
100107 . try_into ( )
101108 . into_iter ( )
102109 . map ( |count| df_counter ( name. to_string ( ) , count) )
103110 . collect ( ) ,
104- Metric :: Histogram ( hist) => {
111+ MetricValue :: Histogram ( hist) => {
105112 let mut res = Vec :: new ( ) ;
106- if let Ok ( count) = hist. count ( ) . try_into ( ) {
107- res. push ( df_counter ( format ! ( "{name}_count" ) , count) ) ;
108- }
109- let snapshot = hist. snapshot ( ) ;
110- if let Ok ( max) = snapshot. max ( ) . try_into ( ) {
111- res. push ( df_gauge ( format ! ( "{name}_max" ) , max) ) ;
112- }
113- if let Ok ( min) = snapshot. min ( ) . try_into ( ) {
114- res. push ( df_gauge ( format ! ( "{name}_min" ) , min) ) ;
115- }
116- if let Some ( p90) = f_to_u ( snapshot. value ( 0.90 ) ) {
117- res. push ( df_gauge ( format ! ( "{name}_p95" ) , p90) ) ;
118- }
119- if let Some ( p99) = f_to_u ( snapshot. value ( 0.99 ) ) {
120- res. push ( df_gauge ( format ! ( "{name}_p99" ) , p99) ) ;
113+
114+ res. push ( df_counter ( format ! ( "{name}_count" ) , hist. count ( ) ) ) ;
115+
116+ if !hist. is_empty ( ) {
117+ if let Some ( max) = f_to_u ( hist. quantile ( 1.0 ) . vortex_expect ( "must not be empty" ) ) {
118+ res. push ( df_gauge ( format ! ( "{name}_max" ) , max) ) ;
119+ }
120+
121+ if let Some ( min) = f_to_u ( hist. quantile ( 0.0 ) . vortex_expect ( "must not be empty" ) ) {
122+ res. push ( df_gauge ( format ! ( "{name}_min" ) , min) ) ;
123+ }
124+
125+ if let Some ( p95) = f_to_u ( hist. quantile ( 0.95 ) . vortex_expect ( "must not be empty" ) ) {
126+ res. push ( df_gauge ( format ! ( "{name}_p95" ) , p95) ) ;
127+ }
128+ if let Some ( p99) = f_to_u ( hist. quantile ( 0.99 ) . vortex_expect ( "must not be empty" ) ) {
129+ res. push ( df_gauge ( format ! ( "{name}_p99" ) , p99) ) ;
130+ }
121131 }
132+
122133 res
123134 }
124- Metric :: Timer ( timer) => {
135+ MetricValue :: Timer ( timer) => {
125136 let mut res = Vec :: new ( ) ;
126- if let Ok ( count) = timer. count ( ) . try_into ( ) {
127- res. push ( df_counter ( format ! ( "{name}_count" ) , count) ) ;
128- }
129- let snapshot = timer. snapshot ( ) ;
130- if let Ok ( max) = snapshot. max ( ) . try_into ( ) {
131- // NOTE(os): unlike Time metrics, gauges allow custom aggregation
132- res. push ( df_gauge ( format ! ( "{name}_max" ) , max) ) ;
133- }
134- if let Ok ( min) = snapshot. min ( ) . try_into ( ) {
135- res. push ( df_gauge ( format ! ( "{name}_min" ) , min) ) ;
136- }
137- if let Some ( p95) = f_to_u ( snapshot. value ( 0.95 ) ) {
138- res. push ( df_gauge ( format ! ( "{name}_p95" ) , p95) ) ;
139- }
140- if let Some ( p99) = f_to_u ( snapshot. value ( 0.95 ) ) {
141- res. push ( df_gauge ( format ! ( "{name}_p99" ) , p99) ) ;
137+ res. push ( df_counter ( format ! ( "{name}_count" ) , timer. count ( ) ) ) ;
138+
139+ if !timer. is_empty ( ) {
140+ let max = timer. quantile ( 1.0 ) . vortex_expect ( "must not be empty" ) ;
141+ res. push ( df_timer ( format ! ( "{name}_max" ) , max) ) ;
142+
143+ let min = timer. quantile ( 0.0 ) . vortex_expect ( "must not be empty" ) ;
144+ res. push ( df_timer ( format ! ( "{name}_min" ) , min) ) ;
145+
146+ let p95 = timer. quantile ( 0.95 ) . vortex_expect ( "must not be empty" ) ;
147+ res. push ( df_timer ( format ! ( "{name}_p95" ) , p95) ) ;
148+
149+ let p99 = timer. quantile ( 0.99 ) . vortex_expect ( "must not be empty" ) ;
150+ res. push ( df_timer ( format ! ( "{name}_p99" ) , p99) ) ;
142151 }
152+
143153 res
144154 }
145155 // TODO(os): add more metric types when added to VortexMetrics
@@ -165,6 +175,15 @@ fn df_gauge(name: String, value: usize) -> DatafusionMetricValue {
165175 }
166176}
167177
178+ fn df_timer ( name : String , value : Duration ) -> DatafusionMetricValue {
179+ let time = Time :: new ( ) ;
180+ time. add_duration ( value) ;
181+ DatafusionMetricValue :: Time {
182+ name : name. into ( ) ,
183+ time,
184+ }
185+ }
186+
168187#[ expect(
169188 clippy:: cast_possible_truncation,
170189 reason = "truncation is checked before cast"
@@ -174,3 +193,74 @@ fn f_to_u(f: f64) -> Option<usize> {
174193 // After the range check, truncation is guaranteed to keep the value in usize bounds.
175194 f. trunc ( ) as usize )
176195}
196+
197+ #[ cfg( test) ]
198+ mod tests {
199+
200+ use datafusion_datasource:: source:: DataSourceExec ;
201+ use datafusion_physical_plan:: ExecutionPlanVisitor ;
202+ use datafusion_physical_plan:: accept;
203+
204+ use super :: VortexMetricsFinder ;
205+ use crate :: common_tests:: TestSessionContext ;
206+
207+ /// Counts the number of DataSourceExec nodes in a plan.
208+ struct DataSourceExecCounter ( usize ) ;
209+
210+ impl ExecutionPlanVisitor for DataSourceExecCounter {
211+ type Error = std:: convert:: Infallible ;
212+ fn pre_visit (
213+ & mut self ,
214+ plan : & dyn datafusion_physical_plan:: ExecutionPlan ,
215+ ) -> Result < bool , Self :: Error > {
216+ if plan. as_any ( ) . downcast_ref :: < DataSourceExec > ( ) . is_some ( ) {
217+ self . 0 += 1 ;
218+ Ok ( false )
219+ } else {
220+ Ok ( true )
221+ }
222+ }
223+ }
224+
225+ #[ tokio:: test]
226+ async fn metrics_finder_returns_one_set_per_data_source_exec ( ) -> anyhow:: Result < ( ) > {
227+ let ctx = TestSessionContext :: default ( ) ;
228+
229+ ctx. session
230+ . sql (
231+ "CREATE EXTERNAL TABLE my_tbl \
232+ (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
233+ STORED AS vortex \
234+ LOCATION 'files/'",
235+ )
236+ . await ?;
237+
238+ ctx. session
239+ . sql ( "INSERT INTO my_tbl VALUES ('a', 1), ('b', 2)" )
240+ . await ?
241+ . collect ( )
242+ . await ?;
243+
244+ let df = ctx. session . sql ( "SELECT * FROM my_tbl" ) . await ?;
245+ let ( state, plan) = df. into_parts ( ) ;
246+ let physical_plan = state. create_physical_plan ( & plan) . await ?;
247+
248+ // Count DataSourceExec nodes
249+ let mut counter = DataSourceExecCounter ( 0 ) ;
250+ accept ( physical_plan. as_ref ( ) , & mut counter) ?;
251+
252+ // Get metrics sets
253+ let metrics_sets = VortexMetricsFinder :: find_all ( physical_plan. as_ref ( ) ) ;
254+
255+ assert ! ( !metrics_sets. is_empty( ) ) ;
256+ assert_eq ! (
257+ metrics_sets. len( ) ,
258+ counter. 0 ,
259+ "Expected one MetricsSet per DataSourceExec, got {} sets for {} DataSourceExec nodes" ,
260+ metrics_sets. len( ) ,
261+ counter. 0
262+ ) ;
263+
264+ Ok ( ( ) )
265+ }
266+ }
0 commit comments