From 9365dded9b51cc2267e2441ab462a1aa83a9b085 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 18 May 2026 13:18:44 +0200 Subject: [PATCH] Add metric abandonedPartialAggregation --- .../org/apache/gluten/metrics/Metrics.java | 4 +++ .../gluten/metrics/OperatorMetrics.java | 3 +++ .../backendsapi/velox/VeloxMetricsApi.scala | 3 +++ .../metrics/HashAggregateMetricsUpdater.scala | 2 ++ .../apache/gluten/metrics/MetricsUtil.scala | 3 +++ .../gluten/execution/VeloxMetricsSuite.scala | 25 ++++++++++++++++++- cpp/core/jni/JniWrapper.cc | 3 ++- cpp/core/utils/Metrics.h | 1 + cpp/velox/compute/WholeStageResultIterator.cc | 3 +++ 9 files changed, 45 insertions(+), 2 deletions(-) diff --git a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java index 1bcfd67392b..1f4179a45b3 100644 --- a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java +++ b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java @@ -42,6 +42,7 @@ public class Metrics implements IMetrics { public long[] numReplacedWithDynamicFilterRows; public long[] numDynamicFilterInputRows; public long[] flushRowCount; + public long[] abandonedPartialAggregation; public long[] loadedToValueHook; public long[] bloomFilterBlocksByteSize; public long[] skippedSplits; @@ -94,6 +95,7 @@ public Metrics( long[] numReplacedWithDynamicFilterRows, long[] numDynamicFilterInputRows, long[] flushRowCount, + long[] abandonedPartialAggregation, long[] loadedToValueHook, long[] bloomFilterBlocksByteSize, long[] scanTime, @@ -140,6 +142,7 @@ public Metrics( this.numReplacedWithDynamicFilterRows = numReplacedWithDynamicFilterRows; this.numDynamicFilterInputRows = numDynamicFilterInputRows; this.flushRowCount = flushRowCount; + this.abandonedPartialAggregation = abandonedPartialAggregation; this.loadedToValueHook = loadedToValueHook; this.bloomFilterBlocksByteSize = bloomFilterBlocksByteSize; this.skippedSplits = skippedSplits; @@ -192,6 +195,7 @@ public OperatorMetrics getOperatorMetrics(int index) { numReplacedWithDynamicFilterRows[index], numDynamicFilterInputRows[index], flushRowCount[index], + abandonedPartialAggregation[index], loadedToValueHook[index], bloomFilterBlocksByteSize[index], scanTime[index], diff --git a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java index d52245a3340..b9e38642503 100644 --- a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java +++ b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java @@ -40,6 +40,7 @@ public class OperatorMetrics implements IOperatorMetrics { public long numReplacedWithDynamicFilterRows; public long numDynamicFilterInputRows; public long flushRowCount; + public long abandonedPartialAggregation; public long loadedToValueHook; public long bloomFilterBlocksByteSize; public long skippedSplits; @@ -87,6 +88,7 @@ public OperatorMetrics( long numReplacedWithDynamicFilterRows, long numDynamicFilterInputRows, long flushRowCount, + long abandonedPartialAggregation, long loadedToValueHook, long bloomFilterBlocksByteSize, long scanTime, @@ -131,6 +133,7 @@ public OperatorMetrics( this.numReplacedWithDynamicFilterRows = numReplacedWithDynamicFilterRows; this.numDynamicFilterInputRows = numDynamicFilterInputRows; this.flushRowCount = flushRowCount; + this.abandonedPartialAggregation = abandonedPartialAggregation; this.loadedToValueHook = loadedToValueHook; this.bloomFilterBlocksByteSize = bloomFilterBlocksByteSize; this.skippedSplits = skippedSplits; diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala index 69ed2f09a99..ab4065a8c88 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala @@ -311,6 +311,9 @@ class VeloxMetricsApi extends MetricsApi with Logging { "number of spilled partitions"), "aggSpilledFiles" -> SQLMetrics.createMetric(sparkContext, "number of spilled files"), "flushRowCount" -> SQLMetrics.createMetric(sparkContext, "number of flushed rows"), + "abandonedPartialAggregation" -> SQLMetrics.createMetric( + sparkContext, + "number of abandoned partial aggregations"), "loadedToValueHook" -> SQLMetrics.createMetric( sparkContext, "number of pushdown aggregations"), diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala index 3843a34b835..2ba964e9649 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/HashAggregateMetricsUpdater.scala @@ -44,6 +44,7 @@ class HashAggregateMetricsUpdaterImpl(val metrics: Map[String, SQLMetric]) val aggSpilledPartitions: SQLMetric = metrics("aggSpilledPartitions") val aggSpilledFiles: SQLMetric = metrics("aggSpilledFiles") val flushRowCount: SQLMetric = metrics("flushRowCount") + val abandonedPartialAggregation: SQLMetric = metrics("abandonedPartialAggregation") val loadedToValueHook: SQLMetric = metrics("loadedToValueHook") val rowConstructionCpuCount: SQLMetric = metrics("rowConstructionCpuCount") @@ -81,6 +82,7 @@ class HashAggregateMetricsUpdaterImpl(val metrics: Map[String, SQLMetric]) aggSpilledPartitions += aggMetrics.spilledPartitions aggSpilledFiles += aggMetrics.spilledFiles flushRowCount += aggMetrics.flushRowCount + abandonedPartialAggregation += aggMetrics.abandonedPartialAggregation loadedToValueHook += aggMetrics.loadedToValueHook idx += 1 diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala index f2c9214b069..e8000c31e5f 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala @@ -122,6 +122,7 @@ object MetricsUtil extends Logging { var numReplacedWithDynamicFilterRows: Long = 0 var numDynamicFilterInputRows: Long = 0 var flushRowCount: Long = 0 + var abandonedPartialAggregation: Long = 0 var loadedToValueHook: Long = 0 var bloomFilterBlocksByteSize: Long = 0 var scanTime: Long = 0 @@ -159,6 +160,7 @@ object MetricsUtil extends Logging { numReplacedWithDynamicFilterRows += metrics.numReplacedWithDynamicFilterRows numDynamicFilterInputRows += metrics.numDynamicFilterInputRows flushRowCount += metrics.flushRowCount + abandonedPartialAggregation += metrics.abandonedPartialAggregation loadedToValueHook += metrics.loadedToValueHook bloomFilterBlocksByteSize += metrics.bloomFilterBlocksByteSize scanTime += metrics.scanTime @@ -203,6 +205,7 @@ object MetricsUtil extends Logging { numReplacedWithDynamicFilterRows, numDynamicFilterInputRows, flushRowCount, + abandonedPartialAggregation, loadedToValueHook, bloomFilterBlocksByteSize, scanTime, diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala index 35edc4fa6e2..3b947c6acc2 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala @@ -16,7 +16,7 @@ */ package org.apache.gluten.execution -import org.apache.gluten.config.GlutenConfig +import org.apache.gluten.config.{GlutenConfig, VeloxConfig} import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.SparkConf @@ -189,6 +189,29 @@ class VeloxMetricsSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa } } + test("Hash aggregate metrics include abandoned partial aggregation") { + withSQLConf( + VeloxConfig.ABANDON_PARTIAL_AGGREGATION_MIN_ROWS.key -> "0", + VeloxConfig.ABANDON_PARTIAL_AGGREGATION_MIN_PCT.key -> "0") { + runQueryAndCompare("SELECT c2, sum(c1) FROM metrics_t1 GROUP BY c2") { + df => + val aggregates = collect(df.queryExecution.executedPlan) { + case agg: HashAggregateExecBaseTransformer => agg + } + assert(aggregates.nonEmpty) + val numTotalAbandonedPartialAggregations = aggregates.map { + agg => + val metrics = agg.metrics + assert(metrics.contains("abandonedPartialAggregation")) + val num = metrics("abandonedPartialAggregation").value + assert(num >= 0) + num + }.sum + assert(numTotalAbandonedPartialAggregations > 0) + } + } + } + test("Metrics of noop filter's children") { runQueryAndCompare("SELECT c1, c2 FROM metrics_t1 where c1 < 50") { df => diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 9e194be6ea8..5bbf32c4c51 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -277,7 +277,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { env, metricsBuilderClass, "", - "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V"); + "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V"); nativeColumnarToRowInfoClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;"); @@ -595,6 +595,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_metrics_IteratorMetricsJniWrapp longArray[Metrics::kNumReplacedWithDynamicFilterRows], longArray[Metrics::kNumDynamicFilterInputRows], longArray[Metrics::kFlushRowCount], + longArray[Metrics::kAbandonedPartialAggregation], longArray[Metrics::kLoadedToValueHook], longArray[Metrics::kBloomFilterBlocksByteSize], longArray[Metrics::kScanTime], diff --git a/cpp/core/utils/Metrics.h b/cpp/core/utils/Metrics.h index 55f2bb4ef50..2cfe85ae3dc 100644 --- a/cpp/core/utils/Metrics.h +++ b/cpp/core/utils/Metrics.h @@ -69,6 +69,7 @@ struct Metrics { kNumReplacedWithDynamicFilterRows, kNumDynamicFilterInputRows, kFlushRowCount, + kAbandonedPartialAggregation, kLoadedToValueHook, kBloomFilterBlocksByteSize, kScanTime, diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 2c1effba20c..c0c699db7db 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -44,6 +44,7 @@ const std::string kDynamicFiltersAccepted = "dynamicFiltersAccepted"; const std::string kReplacedWithDynamicFilterRows = "replacedWithDynamicFilterRows"; const std::string kDynamicFilterInputRows = "dynamicFilterInputRows"; const std::string kFlushRowCount = "flushRowCount"; +const std::string kAbandonedPartialAggregation = "abandonedPartialAggregation"; const std::string kLoadedToValueHook = "loadedToValueHook"; const std::string kBloomFilterBlocksByteSize = "bloomFilterSize"; const std::string kTotalScanTime = "totalScanTime"; @@ -503,6 +504,8 @@ void WholeStageResultIterator::collectMetrics() { metrics_->get(Metrics::kNumDynamicFilterInputRows)[metricIndex] = runtimeMetric("sum", second->customStats, kDynamicFilterInputRows); metrics_->get(Metrics::kFlushRowCount)[metricIndex] = runtimeMetric("sum", second->customStats, kFlushRowCount); + metrics_->get(Metrics::kAbandonedPartialAggregation)[metricIndex] = + runtimeMetric("sum", second->customStats, kAbandonedPartialAggregation); metrics_->get(Metrics::kLoadedToValueHook)[metricIndex] = runtimeMetric("sum", second->customStats, kLoadedToValueHook); metrics_->get(Metrics::kBloomFilterBlocksByteSize)[metricIndex] =