Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 85 additions & 6 deletions src/ops/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -3963,6 +3963,50 @@ static ray_t* project_table_cols(ray_t* src_tbl, const int64_t* keep_syms,
return nt;
}

/* Narrow the result of a computed by-val expression when the AST head is
* a known small-output temporal extract — minute/hh/ss/dd/dow/mm (0..59
* etc.), doy/yyyy (0..366, year): all fit in I16.
*
* Why: mk_compile packs composite by-keys into a 16-byte slot. An I64
* column for minute() (0..59) blows the budget on q18's
* {UserID(8B), minute(8B), SearchPhrase(SYM 2-4B)} → exec_group fallback.
* Narrowing to I16 brings the composite under 16 bytes and unlocks the
* fused mk_ path while keeping decimal display (U8 prints as 0x2F hex
* which is unreadable for a minute value).
*
* Skips when col has nulls — the I64 null sentinel does not survive a
* downcast. */
static ray_t* narrow_known_small_extract_result(ray_t* expr, ray_t* col) {
if (!col || col->type != RAY_I64 || !ray_is_vec(col)) return col;
if (col->attrs & RAY_ATTR_HAS_NULLS) return col;
if (!expr || expr->type != RAY_LIST || ray_len(expr) < 1) return col;
ray_t** e = (ray_t**)ray_data(expr);
if (!e[0] || e[0]->type != -RAY_SYM) return col;
ray_t* head = ray_sym_str(e[0]->i64);
if (!head) return col;
size_t hn = ray_str_len(head);
const char* hp = ray_str_ptr(head);
int known = 0;
if (hn == 6 && memcmp(hp, "minute", 6) == 0) known = 1;
else if (hn == 2 && memcmp(hp, "hh", 2) == 0) known = 1;
else if (hn == 2 && memcmp(hp, "ss", 2) == 0) known = 1;
else if (hn == 2 && memcmp(hp, "dd", 2) == 0) known = 1;
else if (hn == 3 && memcmp(hp, "dow", 3) == 0) known = 1;
else if (hn == 2 && memcmp(hp, "mm", 2) == 0) known = 1;
else if (hn == 3 && memcmp(hp, "doy", 3) == 0) known = 1;
else if (hn == 4 && memcmp(hp, "yyyy", 4) == 0) known = 1;
if (!known) return col;

int64_t len = ray_len(col);
ray_t* out = ray_vec_new(RAY_I16, len);
if (!out || RAY_IS_ERR(out)) return col;
out->len = len;
const int64_t* src = (const int64_t*)ray_data(col);
int16_t* dst = (int16_t*)ray_data(out);
for (int64_t i = 0; i < len; i++) dst[i] = (int16_t)src[i];
return out;
}

ray_t* ray_select(ray_t** args, int64_t n) {
if (n < 1) return ray_error("domain", NULL);
ray_t* dict = args[0];
Expand Down Expand Up @@ -4521,6 +4565,15 @@ ray_t* ray_select(ray_t** args, int64_t n) {
fail_err = ray_error("length", "by-dict val must be a column vector");
failed = true; break;
}
/* Narrow I64 results of known-small temporal extracts (minute,
* hour, day-of-week, etc.) to U8/I16. Keeps q18-shaped
* composite by-keys under mk_compile's 16-byte budget so they
* fuse instead of falling to exec_group. */
ray_t* narrowed = narrow_known_small_extract_result(v, col_vec);
if (narrowed != col_vec) {
ray_release(col_vec);
col_vec = narrowed;
}
ray_t* new_tbl = ray_table_add_col(tbl, k->i64, col_vec);
ray_release(col_vec);
if (!new_tbl || RAY_IS_ERR(new_tbl)) {
Expand Down Expand Up @@ -4583,9 +4636,15 @@ ray_t* ray_select(ray_t** args, int64_t n) {
int multi_key_vec = by_expr && by_expr->type == RAY_SYM
&& ray_len(by_expr) >= 1
&& ray_len(by_expr) <= 16;
if (where_expr && by_expr && !nearest_expr
/* WHERE may be absent: a fused group with no predicate runs the
* worker with a const-true mask (ray_filtered_group accepts a
* NULL pred). This routes high-cardinality multi-key group-bys
* (q16/q32 — no WHERE, millions of groups) onto the fused mk_
* shard path instead of the unfused exec_group fallback, whose
* per-row/per-call SYM-lock overhead dominates at scale. */
if (by_expr && !nearest_expr
&& (single_key_scalar || multi_key_vec)
&& ray_fused_group_supported(where_expr, tbl))
&& (!where_expr || ray_fused_group_supported(where_expr, tbl)))
{
/* Walk the dict aggs. Accept any combination of count/sum/
* min/max/avg with non-COUNT requiring an integer/temporal
Expand Down Expand Up @@ -4697,9 +4756,26 @@ ray_t* ray_select(ray_t** args, int64_t n) {
* count-only: the multi path's per-row update has higher
* overhead than count1. Specifically, count1 owns the
* common-case wins. */
if (n_keys_local == 1 && n_aggs_ok == 1 && has_only_count) {
if (n_keys_local == 1 && n_aggs_ok == 1 && has_only_count
&& where_expr) {
/* Single-key count1 only fuses with a WHERE. A
* no-WHERE single key over a near-unique column
* (e.g. q15 UserID, ~10M groups) is faster on the
* unfused radix exec_group than on the count1
* linear-probe shard; keep it there. */
can_fuse_phase1 = 1; /* will use count1 exec */
} else if (narrow_fits || wide_fits) {
} else if ((narrow_fits || wide_fits)
&& (where_expr
|| (has_only_count && n_keys_local >= 2))) {
/* No-WHERE: only fuse multi-key (≥2) count-only
* shapes. Single-key no-WHERE (even count-only,
* e.g. q15 UserID) and multi-agg (SUM/AVG) over
* near-unique keys (e.g. q32 {WatchID,ClientIP})
* keep per-group state that the unfused radix
* exec_group scatters more cheaply at very high
* cardinality; fusing them there regresses. With
* a WHERE the filtered row count is small enough
* that fusing always wins. */
can_fuse_phase1 = 1; /* will use multi exec */
}
}
Expand Down Expand Up @@ -6583,15 +6659,18 @@ ray_t* ray_select(ray_t** args, int64_t n) {
agg_ops[0] == OP_COUNT) {
root = ray_filtered_group(g, NULL, key_ops, n_keys,
agg_ops, agg_ins, n_aggs);
} else if (can_fuse_phase1 && fused_pred_op != NULL
} else if (can_fuse_phase1
&& (fused_pred_op != NULL || !where_expr)
&& n_nonaggs == 0 && agg_kinds_ok
&& !has_binary_agg && !has_agg_k)
{
/* exec_filtered_group dispatches: count1 (single key,
* single COUNT) → Pass 3 fast path; everything else →
* multi path with packed composite key. Skipped when
* any agg is binary (filtered-group fusion only knows
* about unary aggs) or holistic with a K param. */
* about unary aggs) or holistic with a K param.
* fused_pred_op is NULL when there is no WHERE — the
* fused worker then runs a const-true mask. */
root = ray_filtered_group(g, fused_pred_op,
key_ops, n_keys,
agg_ops, agg_ins, n_aggs);
Expand Down
Loading