Skip to content

feat(cubestore): trim worker partial hash aggregate to top-k groups#11024

Open
waralexrom wants to merge 1 commit into
masterfrom
cubestore-hash-aggregate-limit
Open

feat(cubestore): trim worker partial hash aggregate to top-k groups#11024
waralexrom wants to merge 1 commit into
masterfrom
cubestore-hash-aggregate-limit

Conversation

@waralexrom
Copy link
Copy Markdown
Member

Summary

Lets the worker-side partial hash aggregate trim its output to the query LIMIT before shipping partial states to the router, so far fewer rows cross the network for GROUP BY <non-index-prefix> ORDER BY <subset of group-by> LIMIT k queries. The router stays unchanged in shape (Final + Sort + Limit).

Changes

  • Add TopKHashAggregateExec (+ stream) in queryplanner/topk_aggregate/: a worker-side partial hash aggregate that builds the full group table and, at emit, keeps only the k smallest groups by a total order T = ORDER BY ++ remaining group-by columns when local groups exceed factor * k. It reuses DataFusion's GroupValues/GroupsAccumulator and owns the consume/emit loop (no spill / no early-emit), so the only fork change is making new_group_values public.
  • Add optimizations/topk_aggregate_rewriter.rs: recognizes the exact Sort(/Limit) → Final agg → ClusterSend/Worker → Partial hash agg chain (bails on HAVING filter / nested aggregate / computed projection), replaces the partial with TopKHashAggregateExec, and also appends T's tie-break tail to the router's global Sort so the router selects by the same total order (required for correctness; safe because ORDER BY is a prefix of T).
  • Config partial_hash_aggregate_topk_factor (default 2, env CUBESTORE_PARTIAL_HASH_AGGREGATE_TOPK_FACTOR), threaded ConfigObj → QueryExecutorImpl → PreOptimizeRule.
  • Pretty-printer support for the new exec.
  • Points the datafusion deps at the cubestore-hash-aggregate-limit fork branch (one-line pub new_group_values change).

Note

Depends on the cube-js/arrow-datafusion PR that makes new_group_values public (this branch's Cargo.toml points at the fork branch cubestore-hash-aggregate-limit). Retarget to the merged ref before merging.

Testing

  • cargo test -p cubestore-sql-tests --test in-process: 171 passed, 0 failed.
  • New planning_topk_hash_aggregate (plan): trim attaches; k = limit + offset; no trim when ordering by an aggregate / without a limit; subset ORDER BY totalizes the worker cut and extends the router sort with the tie-break column.
  • New topk_hash_aggregate_trim (results): ASC/DESC full-key + proper-subset ORDER BY (asserted order-independently).
  • TODO before merge: run the cluster (multi-worker) and migration suites in CI.

Add TopKHashAggregateExec: a worker-side partial hash aggregate that trims
its output to the query LIMIT before shipping partial states to the router,
for `GROUP BY <non-index-prefix> ORDER BY <subset of group-by> LIMIT k`.

It reuses DataFusion's GroupValues/GroupsAccumulator building blocks and owns
the consume/emit loop (no spill, no early-emit, full table emitted once), so
the only fork change is making `new_group_values` public. The trim keeps the
k smallest groups by a TOTAL order T = ORDER BY ++ remaining group-by columns
when local groups exceed `factor * k` (configurable via ConfigObj /
CUBESTORE_PARTIAL_HASH_AGGREGATE_TOPK_FACTOR, default 2).

Correctness uses T in two places that must agree: the worker cut and the
router's global Sort+Limit (the rewriter appends T's tail to the router sort,
which is safe since ORDER BY is a prefix of T). The rewriter only fires on the
exact Sort -> Final agg -> ClusterSend/Worker -> Partial hash agg chain and
bails on HAVING filters, nested aggregates or computed projections.

Points cubestore at the cube-js/arrow-datafusion `cubestore-hash-aggregate-limit`
branch for the `pub new_group_values` change.
@waralexrom waralexrom requested a review from a team as a code owner June 5, 2026 22:09
@github-actions github-actions Bot added cube store Issues relating to Cube Store rust Pull requests that update Rust code labels Jun 5, 2026
@claude
Copy link
Copy Markdown
Contributor

claude Bot commented Jun 5, 2026

Claude finished @waralexrom's task in 2m 18s —— View job


Code Review — PR #11024

Verdict: looks solid; 0 high, 2 medium, 3 low — expand for details.
  • Gather context
  • Read new files (topk_aggregate, rewriter, config, executor, tests)
  • Analyze correctness, perf, edge cases
  • Post inline comments + summary

Summary

The core correctness story (total order T = ORDER BY ++ remaining group-by columns applied identically at the worker cut and the router sort) is sound: for any group in the global top-k by T, its local rank on every worker that holds a partial state for it is ≤ its global rank ≤ k, so every partial state the router needs reaches it. The rewriter is appropriately conservative (only fires on the exact Sort(/Limit) → Final → ClusterSend/Worker → Partial hash chain; bails on HAVING / nested aggregates / computed projections). Config plumbing, pretty-printer, and tests (plan + result) are all in place.

The worker still builds the full group table — the win is network bytes, not worker memory. The PR description acknowledges this; just naming it here so the perf framing is clear.

Findings (inline)

Medium

  • descend_to_worker_partial does not reject a second ClusterSend/Worker boundary — crossed_boundary is set-once and is_row_passthrough also treats those nodes as passthrough, so a future plan shape with nested boundaries could fire the rewriter on the wrong Partial and break the "one cut = global top-k by T" invariant. Tighten to if crossed_boundary { return None; } or drop the boundary classes from is_row_passthrough.
  • No MetricsSet is exposed on TopKHashAggregateExec (metrics() returns None). Production tuning of factor and confirmation that the rewriter actually fires will be hard without pre_trim_groups / emitted_groups / trim_triggered counters.

Low

  • Tie-break tail in analyze uses SortOptions::default() (asc, nulls_first=true) regardless of the leading ORDER BY direction. Correctness-safe; worth a one-line comment so a future reader doesn't mistake it for a bug.
  • router_tail name lookup via sort_input_schema.index_of(name) relies implicitly on "group cols are at the prefix of the Final output schema." True today; a comment asserting the invariant would harden the rewriter.
  • aggregate_expressions(...) carries an unused _col_idx_base that mirrors a parameter DataFusion uses to offset column indices for Partial mode — easy to misread as a port omission. Drop it or document why it's intentionally dropped.

Tests

Plan test covers: trim attaches, k = limit + offset, no trim when ordering by an aggregate or without a limit, subset ORDER BY totalizes both worker cut and router sort. Result test covers ASC / DESC full-key and proper-subset orderings (set-asserted on the subset case). Looks adequate. The TODO to run cluster and migration suites pre-merge from the PR description is the right call.

· branch: `cubestore-hash-aggregate-limit`

continue;
}
let name = &group_names[idx];
let options = SortOptions::default();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tie-break sort options are always SortOptions::default() (asc, nulls_first=true) for both the worker trim and the router-sort tail. This is correctness-safe (the appended columns are tie-breakers, the user's ORDER BY prefix is preserved), but it means the previously-unspecified ordering of tied rows now becomes deterministic in a particular direction that doesn't follow the leading ORDER BY direction (e.g. a DESC query produces asc tie-breaks). Worth a one-line comment near here making this contract explicit so the next reader doesn't worry the tail should mirror the leading direction.

Comment on lines +217 to +232
fn descend_to_worker_partial(mut node: Arc<dyn ExecutionPlan>) -> Option<Arc<dyn ExecutionPlan>> {
let mut crossed_boundary = false;
loop {
if let Some(agg) = node.as_any().downcast_ref::<AggregateExec>() {
return (crossed_boundary && *agg.mode() == AggregateMode::Partial)
.then_some(node.clone());
}
if node.as_any().is::<ClusterSendExec>() || node.as_any().is::<WorkerExec>() {
crossed_boundary = true;
node = child(&node)?;
} else if is_row_passthrough(&node) {
node = child(&node)?;
} else {
return None;
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

crossed_boundary is a bool but the descent doesn't reject crossing a second boundary. Once crossed_boundary is true, the loop happily walks through another ClusterSendExec/WorkerExec (either via this branch — which re-sets the same flag — or via is_row_passthrough, which also treats them as passthrough). In current Cubestore plans nested cluster boundaries shouldn't appear, but if they ever did (e.g. via a future plan shape), this rewriter would still fire on the inner Partial and the "one cut equals global top-k by T" invariant would no longer hold. Either reject the second boundary explicitly (if crossed_boundary { return None; }) or remove ClusterSendExec/WorkerExec from is_row_passthrough so the only path across them is this controlled branch.

}
let name = &group_names[idx];
let options = SortOptions::default();
let sort_col_idx = sort_input_schema.index_of(name).ok()?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sort_input_schema.index_of(name) resolves by name on the Final aggregate's output schema. Group columns come first in DataFusion's aggregate output so this is correct in practice, but it's fragile if an aggregate output is ever aliased to the same name as a group-by column (e.g. SUM(day) AS day GROUP BY day — unusual but legal in some plans) — index_of returns the first match, which is what we want here, but the invariant is implicit. A comment asserting "group cols are at the prefix of Final's output schema" would be cheap insurance.

fn aggregate_expressions(
aggr_expr: &[Arc<AggregateFunctionExpr>],
_col_idx_base: usize,
) -> DFResult<Vec<Vec<Arc<dyn PhysicalExpr>>>> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_col_idx_base is unused. Either remove the parameter or use it (the DataFusion original threads num_group_exprs into argument resolution for AggregateMode::Partial to offset column indices, and dropping it without comment is easy to misread as a port bug). Minor.

Ok(Box::pin(stream))
}

fn metrics(&self) -> Option<MetricsSet> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No metrics are exposed (metrics() returns None). Production diagnosis of "did the trim actually fire on this worker and by how much?" will be a lot easier with even minimal MetricsSet counters (pre_trim_groups, emitted_groups, trim_triggered). The current implementation builds the full table either way, so users can't tell from query metrics whether factor is well-tuned or whether the rewriter even matched.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

cube store Issues relating to Cube Store rust Pull requests that update Rust code

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant