Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class Metrics implements IMetrics {
public long[] numReplacedWithDynamicFilterRows;
public long[] numDynamicFilterInputRows;
public long[] flushRowCount;
public long[] abandonedPartialAggregation;
public long[] loadedToValueHook;
public long[] bloomFilterBlocksByteSize;
public long[] skippedSplits;
Expand Down Expand Up @@ -94,6 +95,7 @@ public Metrics(
long[] numReplacedWithDynamicFilterRows,
long[] numDynamicFilterInputRows,
long[] flushRowCount,
long[] abandonedPartialAggregation,
long[] loadedToValueHook,
long[] bloomFilterBlocksByteSize,
long[] scanTime,
Expand Down Expand Up @@ -140,6 +142,7 @@ public Metrics(
this.numReplacedWithDynamicFilterRows = numReplacedWithDynamicFilterRows;
this.numDynamicFilterInputRows = numDynamicFilterInputRows;
this.flushRowCount = flushRowCount;
this.abandonedPartialAggregation = abandonedPartialAggregation;
this.loadedToValueHook = loadedToValueHook;
this.bloomFilterBlocksByteSize = bloomFilterBlocksByteSize;
this.skippedSplits = skippedSplits;
Expand Down Expand Up @@ -192,6 +195,7 @@ public OperatorMetrics getOperatorMetrics(int index) {
numReplacedWithDynamicFilterRows[index],
numDynamicFilterInputRows[index],
flushRowCount[index],
abandonedPartialAggregation[index],
loadedToValueHook[index],
bloomFilterBlocksByteSize[index],
scanTime[index],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class OperatorMetrics implements IOperatorMetrics {
public long numReplacedWithDynamicFilterRows;
public long numDynamicFilterInputRows;
public long flushRowCount;
public long abandonedPartialAggregation;
public long loadedToValueHook;
public long bloomFilterBlocksByteSize;
public long skippedSplits;
Expand Down Expand Up @@ -87,6 +88,7 @@ public OperatorMetrics(
long numReplacedWithDynamicFilterRows,
long numDynamicFilterInputRows,
long flushRowCount,
long abandonedPartialAggregation,
long loadedToValueHook,
long bloomFilterBlocksByteSize,
long scanTime,
Expand Down Expand Up @@ -131,6 +133,7 @@ public OperatorMetrics(
this.numReplacedWithDynamicFilterRows = numReplacedWithDynamicFilterRows;
this.numDynamicFilterInputRows = numDynamicFilterInputRows;
this.flushRowCount = flushRowCount;
this.abandonedPartialAggregation = abandonedPartialAggregation;
this.loadedToValueHook = loadedToValueHook;
this.bloomFilterBlocksByteSize = bloomFilterBlocksByteSize;
this.skippedSplits = skippedSplits;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,9 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"number of spilled partitions"),
"aggSpilledFiles" -> SQLMetrics.createMetric(sparkContext, "number of spilled files"),
"flushRowCount" -> SQLMetrics.createMetric(sparkContext, "number of flushed rows"),
"abandonedPartialAggregation" -> SQLMetrics.createMetric(
sparkContext,
"number of abandoned partial aggregations"),
"loadedToValueHook" -> SQLMetrics.createMetric(
sparkContext,
"number of pushdown aggregations"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class HashAggregateMetricsUpdaterImpl(val metrics: Map[String, SQLMetric])
val aggSpilledPartitions: SQLMetric = metrics("aggSpilledPartitions")
val aggSpilledFiles: SQLMetric = metrics("aggSpilledFiles")
val flushRowCount: SQLMetric = metrics("flushRowCount")
val abandonedPartialAggregation: SQLMetric = metrics("abandonedPartialAggregation")
val loadedToValueHook: SQLMetric = metrics("loadedToValueHook")

val rowConstructionCpuCount: SQLMetric = metrics("rowConstructionCpuCount")
Expand Down Expand Up @@ -81,6 +82,7 @@ class HashAggregateMetricsUpdaterImpl(val metrics: Map[String, SQLMetric])
aggSpilledPartitions += aggMetrics.spilledPartitions
aggSpilledFiles += aggMetrics.spilledFiles
flushRowCount += aggMetrics.flushRowCount
abandonedPartialAggregation += aggMetrics.abandonedPartialAggregation
loadedToValueHook += aggMetrics.loadedToValueHook
idx += 1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ object MetricsUtil extends Logging {
var numReplacedWithDynamicFilterRows: Long = 0
var numDynamicFilterInputRows: Long = 0
var flushRowCount: Long = 0
var abandonedPartialAggregation: Long = 0
var loadedToValueHook: Long = 0
var bloomFilterBlocksByteSize: Long = 0
var scanTime: Long = 0
Expand Down Expand Up @@ -159,6 +160,7 @@ object MetricsUtil extends Logging {
numReplacedWithDynamicFilterRows += metrics.numReplacedWithDynamicFilterRows
numDynamicFilterInputRows += metrics.numDynamicFilterInputRows
flushRowCount += metrics.flushRowCount
abandonedPartialAggregation += metrics.abandonedPartialAggregation
loadedToValueHook += metrics.loadedToValueHook
bloomFilterBlocksByteSize += metrics.bloomFilterBlocksByteSize
scanTime += metrics.scanTime
Expand Down Expand Up @@ -203,6 +205,7 @@ object MetricsUtil extends Logging {
numReplacedWithDynamicFilterRows,
numDynamicFilterInputRows,
flushRowCount,
abandonedPartialAggregation,
loadedToValueHook,
bloomFilterBlocksByteSize,
scanTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.gluten.execution

import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
import org.apache.gluten.sql.shims.SparkShimLoader

import org.apache.spark.SparkConf
Expand Down Expand Up @@ -189,6 +189,29 @@ class VeloxMetricsSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa
}
}

test("Hash aggregate metrics include abandoned partial aggregation") {
withSQLConf(
VeloxConfig.ABANDON_PARTIAL_AGGREGATION_MIN_ROWS.key -> "0",
VeloxConfig.ABANDON_PARTIAL_AGGREGATION_MIN_PCT.key -> "0") {
runQueryAndCompare("SELECT c2, sum(c1) FROM metrics_t1 GROUP BY c2") {
df =>
val aggregates = collect(df.queryExecution.executedPlan) {
case agg: HashAggregateExecBaseTransformer => agg
}
assert(aggregates.nonEmpty)
val numTotalAbandonedPartialAggregations = aggregates.map {
agg =>
val metrics = agg.metrics
assert(metrics.contains("abandonedPartialAggregation"))
val num = metrics("abandonedPartialAggregation").value
assert(num >= 0)
num
}.sum
assert(numTotalAbandonedPartialAggregations > 0)
}
}
}

test("Metrics of noop filter's children") {
runQueryAndCompare("SELECT c1, c2 FROM metrics_t1 where c1 < 50") {
df =>
Expand Down
3 changes: 2 additions & 1 deletion cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
env,
metricsBuilderClass,
"<init>",
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V");
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V");

nativeColumnarToRowInfoClass =
createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;");
Expand Down Expand Up @@ -595,6 +595,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_metrics_IteratorMetricsJniWrapp
longArray[Metrics::kNumReplacedWithDynamicFilterRows],
longArray[Metrics::kNumDynamicFilterInputRows],
longArray[Metrics::kFlushRowCount],
longArray[Metrics::kAbandonedPartialAggregation],
longArray[Metrics::kLoadedToValueHook],
longArray[Metrics::kBloomFilterBlocksByteSize],
longArray[Metrics::kScanTime],
Expand Down
1 change: 1 addition & 0 deletions cpp/core/utils/Metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ struct Metrics {
kNumReplacedWithDynamicFilterRows,
kNumDynamicFilterInputRows,
kFlushRowCount,
kAbandonedPartialAggregation,
kLoadedToValueHook,
kBloomFilterBlocksByteSize,
kScanTime,
Expand Down
3 changes: 3 additions & 0 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const std::string kDynamicFiltersAccepted = "dynamicFiltersAccepted";
const std::string kReplacedWithDynamicFilterRows = "replacedWithDynamicFilterRows";
const std::string kDynamicFilterInputRows = "dynamicFilterInputRows";
const std::string kFlushRowCount = "flushRowCount";
const std::string kAbandonedPartialAggregation = "abandonedPartialAggregation";
const std::string kLoadedToValueHook = "loadedToValueHook";
const std::string kBloomFilterBlocksByteSize = "bloomFilterSize";
const std::string kTotalScanTime = "totalScanTime";
Expand Down Expand Up @@ -503,6 +504,8 @@ void WholeStageResultIterator::collectMetrics() {
metrics_->get(Metrics::kNumDynamicFilterInputRows)[metricIndex] =
runtimeMetric("sum", second->customStats, kDynamicFilterInputRows);
metrics_->get(Metrics::kFlushRowCount)[metricIndex] = runtimeMetric("sum", second->customStats, kFlushRowCount);
metrics_->get(Metrics::kAbandonedPartialAggregation)[metricIndex] =
runtimeMetric("sum", second->customStats, kAbandonedPartialAggregation);
metrics_->get(Metrics::kLoadedToValueHook)[metricIndex] =
runtimeMetric("sum", second->customStats, kLoadedToValueHook);
metrics_->get(Metrics::kBloomFilterBlocksByteSize)[metricIndex] =
Expand Down
Loading