Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 26 additions & 26 deletions rust/cubestore/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

143 changes: 143 additions & 0 deletions rust/cubestore/cubestore-sql-tests/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ pub fn sql_tests(prefix: &str) -> Vec<(&'static str, TestFn)> {
t("planning_inplace_aggregate", planning_inplace_aggregate),
t("planning_hints", planning_hints),
t("planning_inplace_aggregate2", planning_inplace_aggregate2),
t("planning_topk_hash_aggregate", planning_topk_hash_aggregate),
t("topk_hash_aggregate_trim", topk_hash_aggregate_trim),
t("topk_large_inputs", topk_large_inputs),
t("partitioned_index", partitioned_index),
t(
Expand Down Expand Up @@ -386,6 +388,7 @@ lazy_static::lazy_static! {
"create_table_with_csv_no_header_and_delimiter",
"create_table_with_csv_no_header_and_quotes",
"filter_pushdown_unique_key",
"planning_topk_hash_aggregate",
].into_iter().map(ToOwned::to_owned).collect();
}

Expand Down Expand Up @@ -3162,6 +3165,146 @@ async fn planning_inplace_aggregate(service: Box<dyn SqlClient>) -> Result<(), C
Ok(())
}

async fn planning_topk_hash_aggregate(service: Box<dyn SqlClient>) -> Result<(), CubeError> {
service.exec_query("CREATE SCHEMA s").await?;
service
.exec_query("CREATE TABLE s.Data(url text, day int, hits int)")
.await?;
service
.exec_query("CREATE TABLE s.D3(a int, b int, c int, h int)")
.await?;

// GROUP BY a non-indexed column -> hash (Linear) partial aggregate; ORDER BY the group
// column with a LIMIT -> the worker partial aggregate is replaced by TopKHashAggregate.
let p = service
.plan_query("SELECT day, SUM(hits) FROM s.Data GROUP BY 1 ORDER BY 1 LIMIT 10")
.await?;
let pp = pp_phys_plan_ext(p.worker.as_ref(), &PPOptions::none());
assert!(
pp.contains("TopKHashAggregate, k: 10, factor: 2,"),
"expected TopKHashAggregate on the worker, got:\n{}",
pp
);

// LIMIT + OFFSET -> k = limit + offset.
let p = service
.plan_query("SELECT day, SUM(hits) FROM s.Data GROUP BY 1 ORDER BY 1 LIMIT 10 OFFSET 5")
.await?;
let pp = pp_phys_plan_ext(p.worker.as_ref(), &PPOptions::none());
assert!(
pp.contains("TopKHashAggregate, k: 15, factor: 2,"),
"expected k=15 (limit+offset), got:\n{}",
pp
);

// ORDER BY an aggregate (not a group-by column) -> no trim.
let p = service
.plan_query("SELECT day, SUM(hits) FROM s.Data GROUP BY 1 ORDER BY 2 DESC LIMIT 10")
.await?;
let pp = pp_phys_plan_ext(p.worker.as_ref(), &PPOptions::none());
assert!(
!pp.contains("TopKHashAggregate"),
"did not expect TopKHashAggregate when ordering by an aggregate, got:\n{}",
pp
);

// No LIMIT -> no trim.
let p = service
.plan_query("SELECT day, SUM(hits) FROM s.Data GROUP BY 1 ORDER BY 1")
.await?;
let pp = pp_phys_plan_ext(p.worker.as_ref(), &PPOptions::none());
assert!(
!pp.contains("TopKHashAggregate"),
"did not expect TopKHashAggregate without a limit, got:\n{}",
pp
);

// ORDER BY a proper SUBSET of GROUP BY (b out of b, c). The worker cut and the router sort must
// both use the total order T = [b, c]: the worker trim order carries the tie-break column c, and
// the router's global Sort is extended with c so its top-k matches the global top-k by T.
let p = service
.plan_query("SELECT b, c, SUM(h) FROM s.D3 GROUP BY 1, 2 ORDER BY 1 LIMIT 3")
.await?;
let worker_pp = pp_phys_plan_ext(p.worker.as_ref(), &PPOptions::none());
assert!(
worker_pp.contains("TopKHashAggregate, k: 3, factor: 2,")
&& worker_pp.contains("(0, SortOptions { descending: false, nulls_first: false })")
&& worker_pp.contains("(1, SortOptions { descending: false, nulls_first: true })"),
"expected worker trim order [b, c] totalized, got:\n{}",
worker_pp
);
let router_pp = pp_phys_plan_ext(
p.router.as_ref(),
&PPOptions {
show_sort_by: true,
..PPOptions::none()
},
);
assert!(
router_pp.contains("b@0") && router_pp.contains("c@1"),
"expected router Sort extended with the tie-break column c, got:\n{}",
router_pp
);

Ok(())
}

async fn topk_hash_aggregate_trim(service: Box<dyn SqlClient>) -> Result<(), CubeError> {
service.exec_query("CREATE SCHEMA s").await?;
service
.exec_query("CREATE TABLE s.Data(a int, b int, hits int)")
.await?;
// 12 distinct (a, b) groups, each with two rows so partial aggregation actually groups.
// With k=3 and factor=2 the trim activates (g=12 > 6) but the result must match a full
// top-k. ORDER BY a (a proper subset of GROUP BY a, b) exercises totalization: the worker
// breaks ties on a by b so the router still receives every needed partial state.
service
.exec_query(
"INSERT INTO s.Data(a, b, hits) VALUES \
(1,1,10),(1,1,5),(1,2,1),(1,2,2),\
(2,1,7),(2,1,3),(2,2,4),(2,2,6),\
(3,1,8),(3,1,2),(3,2,9),(3,2,1),\
(4,1,1),(4,1,1),(4,2,1),(4,2,1),\
(5,1,1),(5,1,1),(5,2,1),(5,2,1),\
(6,1,1),(6,1,1),(6,2,1),(6,2,1)",
)
.await?;

// ORDER BY a, b LIMIT 3 (ascending): smallest three groups by (a, b).
let r = service
.exec_query("SELECT a, b, SUM(hits) FROM s.Data GROUP BY 1, 2 ORDER BY 1, 2 LIMIT 3")
.await?;
assert_eq!(to_rows(&r), rows(&[(1, 1, 15), (1, 2, 3), (2, 1, 10)]));

// ORDER BY a, b DESC LIMIT 3: largest three groups by (a, b).
let r = service
.exec_query(
"SELECT a, b, SUM(hits) FROM s.Data GROUP BY 1, 2 ORDER BY 1 DESC, 2 DESC LIMIT 3",
)
.await?;
assert_eq!(to_rows(&r), rows(&[(6, 2, 2), (6, 1, 2), (5, 2, 2)]));

// ORDER BY a only (a proper subset of GROUP BY a, b), LIMIT 2. The selected group SET is
// deterministic (both groups of a=1), but the intra-tie row order is not, so assert as a set.
// Each returned group must carry its complete sum regardless of cross-worker tie-breaking,
// which is what totalization (append b to the cut order) guarantees.
let r = service
.exec_query("SELECT a, b, SUM(hits) FROM s.Data GROUP BY 1, 2 ORDER BY 1 LIMIT 2")
.await?;
let got = to_rows(&r);
assert_eq!(got.len(), 2, "expected 2 rows, got: {:?}", got);
for expected in rows(&[(1, 1, 15), (1, 2, 3)]) {
assert!(
got.contains(&expected),
"missing {:?} in {:?}",
expected,
got
);
}

Ok(())
}

async fn planning_hints(service: Box<dyn SqlClient>) -> Result<(), CubeError> {
service.exec_query("CREATE SCHEMA s").await?;
service
Expand Down
Loading
Loading