diff --git a/src/ops/query.c b/src/ops/query.c index 3b08415c..845eebc1 100644 --- a/src/ops/query.c +++ b/src/ops/query.c @@ -3963,6 +3963,50 @@ static ray_t* project_table_cols(ray_t* src_tbl, const int64_t* keep_syms, return nt; } +/* Narrow the result of a computed by-val expression when the AST head is + * a known small-output temporal extract — minute/hh/ss/dd/dow/mm (0..59 + * etc.), doy/yyyy (0..366, year): all fit in I16. + * + * Why: mk_compile packs composite by-keys into a 16-byte slot. An I64 + * column for minute() (0..59) blows the budget on q18's + * {UserID(8B), minute(8B), SearchPhrase(SYM 2-4B)} → exec_group fallback. + * Narrowing to I16 brings the composite under 16 bytes and unlocks the + * fused mk_ path while keeping decimal display (U8 prints as 0x2F hex + * which is unreadable for a minute value). + * + * Skips when col has nulls — the I64 null sentinel does not survive a + * downcast. */ +static ray_t* narrow_known_small_extract_result(ray_t* expr, ray_t* col) { + if (!col || col->type != RAY_I64 || !ray_is_vec(col)) return col; + if (col->attrs & RAY_ATTR_HAS_NULLS) return col; + if (!expr || expr->type != RAY_LIST || ray_len(expr) < 1) return col; + ray_t** e = (ray_t**)ray_data(expr); + if (!e[0] || e[0]->type != -RAY_SYM) return col; + ray_t* head = ray_sym_str(e[0]->i64); + if (!head) return col; + size_t hn = ray_str_len(head); + const char* hp = ray_str_ptr(head); + int known = 0; + if (hn == 6 && memcmp(hp, "minute", 6) == 0) known = 1; + else if (hn == 2 && memcmp(hp, "hh", 2) == 0) known = 1; + else if (hn == 2 && memcmp(hp, "ss", 2) == 0) known = 1; + else if (hn == 2 && memcmp(hp, "dd", 2) == 0) known = 1; + else if (hn == 3 && memcmp(hp, "dow", 3) == 0) known = 1; + else if (hn == 2 && memcmp(hp, "mm", 2) == 0) known = 1; + else if (hn == 3 && memcmp(hp, "doy", 3) == 0) known = 1; + else if (hn == 4 && memcmp(hp, "yyyy", 4) == 0) known = 1; + if (!known) return col; + + int64_t len = ray_len(col); + ray_t* out = ray_vec_new(RAY_I16, len); + if (!out || RAY_IS_ERR(out)) return col; + out->len = len; + const int64_t* src = (const int64_t*)ray_data(col); + int16_t* dst = (int16_t*)ray_data(out); + for (int64_t i = 0; i < len; i++) dst[i] = (int16_t)src[i]; + return out; +} + ray_t* ray_select(ray_t** args, int64_t n) { if (n < 1) return ray_error("domain", NULL); ray_t* dict = args[0]; @@ -4521,6 +4565,15 @@ ray_t* ray_select(ray_t** args, int64_t n) { fail_err = ray_error("length", "by-dict val must be a column vector"); failed = true; break; } + /* Narrow I64 results of known-small temporal extracts (minute, + * hour, day-of-week, etc.) to U8/I16. Keeps q18-shaped + * composite by-keys under mk_compile's 16-byte budget so they + * fuse instead of falling to exec_group. */ + ray_t* narrowed = narrow_known_small_extract_result(v, col_vec); + if (narrowed != col_vec) { + ray_release(col_vec); + col_vec = narrowed; + } ray_t* new_tbl = ray_table_add_col(tbl, k->i64, col_vec); ray_release(col_vec); if (!new_tbl || RAY_IS_ERR(new_tbl)) { @@ -4583,9 +4636,15 @@ ray_t* ray_select(ray_t** args, int64_t n) { int multi_key_vec = by_expr && by_expr->type == RAY_SYM && ray_len(by_expr) >= 1 && ray_len(by_expr) <= 16; - if (where_expr && by_expr && !nearest_expr + /* WHERE may be absent: a fused group with no predicate runs the + * worker with a const-true mask (ray_filtered_group accepts a + * NULL pred). This routes high-cardinality multi-key group-bys + * (q16/q32 — no WHERE, millions of groups) onto the fused mk_ + * shard path instead of the unfused exec_group fallback, whose + * per-row/per-call SYM-lock overhead dominates at scale. */ + if (by_expr && !nearest_expr && (single_key_scalar || multi_key_vec) - && ray_fused_group_supported(where_expr, tbl)) + && (!where_expr || ray_fused_group_supported(where_expr, tbl))) { /* Walk the dict aggs. Accept any combination of count/sum/ * min/max/avg with non-COUNT requiring an integer/temporal @@ -4697,9 +4756,26 @@ ray_t* ray_select(ray_t** args, int64_t n) { * count-only: the multi path's per-row update has higher * overhead than count1. Specifically, count1 owns the * common-case wins. */ - if (n_keys_local == 1 && n_aggs_ok == 1 && has_only_count) { + if (n_keys_local == 1 && n_aggs_ok == 1 && has_only_count + && where_expr) { + /* Single-key count1 only fuses with a WHERE. A + * no-WHERE single key over a near-unique column + * (e.g. q15 UserID, ~10M groups) is faster on the + * unfused radix exec_group than on the count1 + * linear-probe shard; keep it there. */ can_fuse_phase1 = 1; /* will use count1 exec */ - } else if (narrow_fits || wide_fits) { + } else if ((narrow_fits || wide_fits) + && (where_expr + || (has_only_count && n_keys_local >= 2))) { + /* No-WHERE: only fuse multi-key (≥2) count-only + * shapes. Single-key no-WHERE (even count-only, + * e.g. q15 UserID) and multi-agg (SUM/AVG) over + * near-unique keys (e.g. q32 {WatchID,ClientIP}) + * keep per-group state that the unfused radix + * exec_group scatters more cheaply at very high + * cardinality; fusing them there regresses. With + * a WHERE the filtered row count is small enough + * that fusing always wins. */ can_fuse_phase1 = 1; /* will use multi exec */ } } @@ -6583,7 +6659,8 @@ ray_t* ray_select(ray_t** args, int64_t n) { agg_ops[0] == OP_COUNT) { root = ray_filtered_group(g, NULL, key_ops, n_keys, agg_ops, agg_ins, n_aggs); - } else if (can_fuse_phase1 && fused_pred_op != NULL + } else if (can_fuse_phase1 + && (fused_pred_op != NULL || !where_expr) && n_nonaggs == 0 && agg_kinds_ok && !has_binary_agg && !has_agg_k) { @@ -6591,7 +6668,9 @@ ray_t* ray_select(ray_t** args, int64_t n) { * single COUNT) → Pass 3 fast path; everything else → * multi path with packed composite key. Skipped when * any agg is binary (filtered-group fusion only knows - * about unary aggs) or holistic with a K param. */ + * about unary aggs) or holistic with a K param. + * fused_pred_op is NULL when there is no WHERE — the + * fused worker then runs a const-true mask. */ root = ray_filtered_group(g, fused_pred_op, key_ops, n_keys, agg_ops, agg_ins, n_aggs);