Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
8c4f7ed
revert: remove fraudulent profiling-gated result caches
singaraiona May 26, 2026
d4da302
refactor(query): remove benchmark-shaped query fast-paths
singaraiona May 26, 2026
240c938
perf(group): early-abort the DA-path min/max probe on doomed key spans
hetoku May 22, 2026
e825f84
perf(group): fused radix HT — per-(worker, partition) direct insert
hetoku May 25, 2026
b5c9ce4
perf(group): extend per-partition path to SUM/AVG aggregators
hetoku May 25, 2026
00bdcd8
fix(group): minmax early-abort check fires within morsels, not at bou…
hetoku May 26, 2026
9bac421
perf(group): skip accum_from_entry when the HT layout has no agg state
hetoku May 26, 2026
f176d47
perf(fused_group): pre-size worker shards by nrows heuristic
hetoku May 26, 2026
bee98d4
feat(group): HyperLogLog approximate count-distinct kernel
singaraiona May 26, 2026
008f690
feat(idx): per-chunk min/max zone index + filter chunk-skip
singaraiona May 26, 2026
2eb01e8
revert: remove fraudulent profiling-gated do_null_cache
singaraiona May 26, 2026
c91b384
revert: remove fraudulent reduce-result cache
singaraiona May 26, 2026
c93612e
refactor: drop unused env-generation counter
singaraiona May 26, 2026
2d4087d
feat(hll): sparse-representation sketch + per-group routing at high g…
singaraiona May 26, 2026
bde9c9e
feat(idx): hash-index point-lookup fast path for eq filters
singaraiona May 26, 2026
bfd8cf5
feat(query): top-K heap extraction for sorted+take group-by
singaraiona May 26, 2026
9d974a0
perf(fused_group): gate hash-index dispatch on single-predicate filters
singaraiona May 26, 2026
c33d163
perf(query): allow wide-key (>8B) group-by to fuse with any agg mix
singaraiona May 26, 2026
7da706d
feat(hll): streaming per-group HLL — skip idx_buf scatter
singaraiona May 26, 2026
92ed6da
perf(query): evaluate by-expression keys under selection
May 26, 2026
e11623a
perf(query): route small-n_groups count-distinct through streaming HLL
singaraiona May 26, 2026
e178099
perf(fused_group): Misra-Gries top-K for I64 / TIMESTAMP key columns
singaraiona May 26, 2026
e5639a1
fix(fused_group): properly accept TIMESTAMP for I64 MG top-K
singaraiona May 26, 2026
2702cb7
perf(query): skip idx_buf scatter when streaming HLL covers all non-aggs
singaraiona May 27, 2026
c0bdb64
perf(query): drop redundant ray_heap_gc inside apply_sort_take
singaraiona May 27, 2026
531b111
perf(query): drop redundant ray_heap_gc inside apply_sort_take topk path
singaraiona May 27, 2026
2585aee
perf: drop 3 more redundant ray_heap_gc calls
singaraiona May 27, 2026
c6522d4
perf(heap): skip empty freelist orders in ray_heap_gc pass 5
singaraiona May 27, 2026
b711b43
perf(heap): skip empty freelist orders in pass 2 foreign-block return
singaraiona May 27, 2026
0db6f8f
perf(fused_group): WIP v2 — per-(worker, partition) shards for multi-key
singaraiona May 27, 2026
ca16c81
perf(fused_group): proper per-partition combine for v2
singaraiona May 27, 2026
0efdcde
perf(fused_group): eager-init v2 partition shards per worker
singaraiona May 27, 2026
06bbea5
perf(query): route count_distinct_per_group_buf through streaming HLL
singaraiona May 27, 2026
706c5be
perf(query): planner rewrite for `(count distinct X) by K` → 2-stage …
singaraiona May 28, 2026
b3cd9c5
fix(query): correct count-distinct alias + SYM empty-string comparison
singaraiona Jun 1, 2026
c1b91fe
fix(query): drop unused saw_key_proj counter
singaraiona Jun 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 157 additions & 0 deletions src/io/csv.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "core/pool.h"
#include "lang/format.h"
#include "ops/hash.h"
#include "ops/idxop.h" /* attach per-chunk zone index after load */
#include "store/col.h"
#include "store/fileio.h"
#include "store/splay.h"
Expand Down Expand Up @@ -1227,6 +1228,113 @@ static void csv_parse_serial(const char* buf, size_t buf_size,
}
}

/* Per-column elem size for the hash-attach cap. Mirrors the integer
* shapes accepted by ray_index_attach_hash (BOOL/U8/I16/I32/I64/DATE/
* TIME/TIMESTAMP); returns 0 for floats and dict-backed types so the
* caller skips them. */
static int csv_hash_elem_size(int8_t t) {
switch (t) {
case RAY_BOOL: case RAY_U8: return 1;
case RAY_I16: return 2;
case RAY_I32: case RAY_DATE: return 4;
case RAY_I64: case RAY_TIME: case RAY_TIMESTAMP: return 8;
default: return 0;
}
}

/* Decide whether `v` is a good candidate for an auto-attached hash
* index, using only its (already-attached) chunk_zone as the entropy
* proxy. A column is "random-shaped" when each chunk's [min, max]
* covers more than half the global range — i.e. there's effectively
* no clustering, so the per-chunk zone-skip never excludes a chunk
* and the only way to accelerate `col == K` is by hashing.
*
* The memory cap rejects columns where the hash index (table+chain
* arrays — ~24 bytes/row at default load factor) would be much larger
* than the data itself. We use 5× the column's data bytes as the
* budget: this comfortably admits I32/I64 numeric IDs (where the
* index is 3–5× the data) while still excluding narrow types like
* BOOL/U8/I16 where the index would dwarf the column.
*
* Returns 1 to attach, 0 to skip. */
static int csv_should_attach_hash(ray_t* v) {
if (!v || RAY_IS_ERR(v)) return 0;
int esz = csv_hash_elem_size(v->type);
if (esz == 0) return 0;
/* Need a chunk_zone we can read for entropy estimation. */
if (!(v->attrs & RAY_ATTR_HAS_INDEX) || !v->index) return 0;
ray_index_t* ix = ray_index_payload(v->index);
if (ix->kind != RAY_IDX_CHUNK_ZONE || ix->u.chunk_zone.is_f64) return 0;
uint32_t n_chunks = ix->u.chunk_zone.n_chunks;
if (n_chunks < 4) return 0;
const int64_t* mins = (const int64_t*)ray_data(ix->u.chunk_zone.mins);
const int64_t* maxs = (const int64_t*)ray_data(ix->u.chunk_zone.maxs);

/* Whole-column [gmin, gmax] from the chunk extrema, ignoring empty
* chunks (mn > mx, set by the chunk_zone scan when a chunk is fully
* null). */
int64_t gmin = INT64_MAX, gmax = INT64_MIN;
for (uint32_t g = 0; g < n_chunks; g++) {
if (mins[g] > maxs[g]) continue;
if (mins[g] < gmin) gmin = mins[g];
if (maxs[g] > gmax) gmax = maxs[g];
}
if (gmin == INT64_MAX || gmax == INT64_MIN) return 0;
/* Compute (gmax - gmin) in uint64 space — the signed subtraction
* overflows when the range spans the full I64 width (e.g. UserID
* hashing to both sign halves). Reinterpret as uint64 first;
* 2's-complement wrap gives the correct |gmax - gmin|. */
uint64_t global_range = (uint64_t)gmax - (uint64_t)gmin;
if (global_range == 0) return 0; /* constant column — pointless */

/* Average per-chunk span / global range — selectivity proxy.
* Sum the per-chunk spans as doubles so the accumulation can't
* overflow when chunks span the full I64 width (uint64 sum
* across ~150 chunks each ~1.8e19 wide overflows; double has
* ~15 significant decimal digits, plenty for this coarse ratio).
*
* Threshold = 0.2. The strict 0.5 cut documented in the design
* note cleanly catches uniformly-random hashed columns (ratio
* ~1.0) but excludes mildly-clustered numeric IDs like UserID
* (~0.26 on the ClickBench hits data: user sessions cluster
* consecutively so chunk spans don't fully cover the I64 range).
* For point lookups on those columns chunk_zone still prunes
* most chunks but ~30 % can hold the key — a 30 % full-column
* scan, not a real win. Dropping to 0.2 admits UserID while
* still excluding tightly-clustered keys (CounterID/EventDate
* at <0.01) where chunk_zone already gives 99 %+ pruning. */
double dgr = (double)global_range;
double span_sum = 0.0;
uint32_t n_eff = 0;
for (uint32_t g = 0; g < n_chunks; g++) {
if (mins[g] > maxs[g]) continue;
uint64_t span = (uint64_t)maxs[g] - (uint64_t)mins[g];
span_sum += (double)span;
n_eff++;
}
if (n_eff < 4) return 0;
double mean_ratio = (span_sum / (double)n_eff) / dgr;
if (mean_ratio <= 0.2) return 0;

/* Memory cap: ray_index_attach_hash allocates a power-of-two
* `cap = next_pow2(2*n)` int64 table plus an n-entry int64
* chain. Skip when the index would cost more than 5× the
* column's payload — keeps narrow integer types (where the
* index dwarfs the data) out of the index set while admitting
* I32 / I64 numeric IDs. Done in int64 arithmetic (we cap n
* to anything that would overflow at the row counts we accept). */
int64_t n = v->len;
if (n <= 0) return 0;
uint64_t cap = 8;
uint64_t want = (uint64_t)(2 * n);
while (cap < want) cap <<= 1;
uint64_t aux_bytes = cap * 8u + (uint64_t)n * 8u;
uint64_t data_bytes = (uint64_t)n * (uint64_t)esz;
if (aux_bytes > 5u * data_bytes) return 0;

return 1;
}

static ray_t* csv_materialize_rows(const char* buf, size_t file_size,
const int64_t* row_offsets, int64_t n_rows,
int ncols, char delimiter,
Expand Down Expand Up @@ -1410,6 +1518,36 @@ static ray_t* csv_materialize_rows(const char* buf, size_t file_size,
col_data[c] = dst;
}

/* Per-chunk min/max + null bit on every column big enough to be worth
* indexing — gives the reduce min/max and the filter chunk-skip paths
* an O(n_chunks) scan instead of O(n_rows). Attach is best-effort:
* unsupported types (RAY_STR/RAY_SYM/RAY_GUID in v1) just stay
* unindexed and the consumer falls back to a row scan.
*
* After the chunk_zone attaches we re-walk the same columns and
* upgrade the high-entropy ones to a hash index (the chunk_zone
* stays as well — it's the entropy signal we just measured). See
* csv_should_attach_hash for the selectivity + memory cap. */
for (int c = 0; c < ncols; c++) {
ray_t* v = col_vecs[c];
if (!v || RAY_IS_ERR(v)) continue;
if (v->len < (1 << 16)) continue; /* < one chunk, skip */
ray_t* r = ray_index_attach_chunk_zone(&v, 16);
if (r && !RAY_IS_ERR(r)) col_vecs[c] = v; /* attach succeeded */
/* On failure the original column stays in col_vecs[c]; ignore. */
}
for (int c = 0; c < ncols; c++) {
ray_t* v = col_vecs[c];
if (!csv_should_attach_hash(v)) continue;
/* ray_index_attach_hash drops any existing index on the
* column first; the chunk_zone we just built is sacrificed
* for the hash. That's the right trade — once the column
* is known to be high-entropy, chunk-skip never fires
* anyway, so the chunk_zone is dead weight. */
ray_t* r = ray_index_attach_hash(&v);
if (r && !RAY_IS_ERR(r)) col_vecs[c] = v;
}

ray_t* tbl = ray_table_new(ncols);
if (!tbl || RAY_IS_ERR(tbl)) {
for (int c = 0; c < ncols; c++) ray_release(col_vecs[c]);
Expand Down Expand Up @@ -1788,6 +1926,25 @@ ray_t* ray_read_csv_named_opts(const char* path, char delimiter, bool header,

/* ---- 11. Build table ---- */
{
/* Best-effort per-chunk zone index attach (see comment on the
* matching loop in build_table_from_cols) — unsupported types
* fall through to the unindexed path inside the consumer.
* Second pass upgrades high-entropy columns to a hash index;
* see csv_should_attach_hash. */
for (int c = 0; c < ncols; c++) {
ray_t* v = col_vecs[c];
if (!v || RAY_IS_ERR(v)) continue;
if (v->len < (1 << 16)) continue;
ray_t* r = ray_index_attach_chunk_zone(&v, 16);
if (r && !RAY_IS_ERR(r)) col_vecs[c] = v;
}
for (int c = 0; c < ncols; c++) {
ray_t* v = col_vecs[c];
if (!csv_should_attach_hash(v)) continue;
ray_t* r = ray_index_attach_hash(&v);
if (r && !RAY_IS_ERR(r)) col_vecs[c] = v;
}

ray_t* tbl = ray_table_new(ncols);
if (!tbl || RAY_IS_ERR(tbl)) {
for (int c = 0; c < ncols; c++) ray_release(col_vecs[c]);
Expand Down
14 changes: 0 additions & 14 deletions src/lang/env.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,6 @@
#include <stdlib.h>
#include <string.h>

static _Atomic uint64_t g_env_generation = 1;

uint64_t ray_env_generation(void) {
return atomic_load_explicit(&g_env_generation, memory_order_relaxed);
}

static void env_bump_generation_if_user(int is_user) {
if (is_user)
atomic_fetch_add_explicit(&g_env_generation, 1, memory_order_relaxed);
}

/* ---- Function constructors ---- */

/* Builtin name stored inline in nullmap[2..15] (max 13 chars + null).
Expand Down Expand Up @@ -311,7 +300,6 @@ static ray_err_t env_bind_global_impl(int64_t sym_id, ray_t* val, int is_user) {
g_env.user[j] = g_env.user[j + 1];
}
g_env.count--;
env_bump_generation_if_user(is_user);
env_unlock();
return RAY_OK;
}
Expand All @@ -324,7 +312,6 @@ static ray_err_t env_bind_global_impl(int64_t sym_id, ray_t* val, int is_user) {
* flag alone — once user, always user, until the slot is
* deleted. */
if (is_user) g_env.user[i] = 1;
env_bump_generation_if_user(is_user);
env_unlock();
return RAY_OK;
}
Expand All @@ -342,7 +329,6 @@ static ray_err_t env_bind_global_impl(int64_t sym_id, ray_t* val, int is_user) {
g_env.vals[g_env.count] = val;
g_env.user[g_env.count] = is_user ? 1 : 0;
g_env.count++;
env_bump_generation_if_user(is_user);
env_unlock();
return RAY_OK;
}
Expand Down
1 change: 0 additions & 1 deletion src/lang/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ static inline const char* ray_fn_name(const ray_t* fn) {
ray_err_t ray_env_init(void);
void ray_env_destroy(void);
ray_t* ray_env_get(int64_t sym_id);
uint64_t ray_env_generation(void);

/* User-facing binder. Refuses any name starting with `.` — that root is
* reserved for system namespaces (.sys, .os, .io, .ipc, …) populated by
Expand Down
109 changes: 0 additions & 109 deletions src/lang/eval.c
Original file line number Diff line number Diff line change
Expand Up @@ -1480,116 +1480,9 @@ ray_t* ray_cond_fn(ray_t** args, int64_t n) {
return make_i64(0);
}

static uint64_t do_cache_mix(uint64_t h, uint64_t v) {
h ^= v + 0x9e3779b97f4a7c15ull + (h << 6) + (h >> 2);
return h ? h : 0x9e3779b97f4a7c15ull;
}

static uint64_t do_cache_hash(ray_t* x) {
if (!x) return 0x1234abcd5678ef00ull;
uint64_t h = do_cache_mix(0xcbf29ce484222325ull, (uint64_t)(uint8_t)x->type);
h = do_cache_mix(h, (uint64_t)x->attrs);
h = do_cache_mix(h, (x->type == -RAY_STR)
? (uint64_t)ray_str_len(x)
: (uint64_t)x->len);
if (x->type == RAY_LIST) {
ray_t** elems = (ray_t**)ray_data(x);
for (int64_t i = 0; i < x->len; i++)
h = do_cache_mix(h, do_cache_hash(elems[i]));
} else if (x->type == RAY_DICT) {
h = do_cache_mix(h, do_cache_hash(ray_dict_keys(x)));
h = do_cache_mix(h, do_cache_hash(ray_dict_vals(x)));
} else if (x->type == RAY_STR) {
for (int64_t i = 0; i < x->len; i++) {
size_t n = 0;
const char* s = ray_str_vec_get(x, i, &n);
for (size_t j = 0; s && j < n; j++)
h = do_cache_mix(h, (unsigned char)s[j]);
}
} else if (x->type == -RAY_STR) {
const char* s = ray_str_ptr(x);
size_t n = ray_str_len(x);
for (size_t i = 0; s && i < n; i++)
h = do_cache_mix(h, (unsigned char)s[i]);
} else if (x->type == RAY_SYM || x->type == -RAY_SYM ||
x->type == RAY_I64 || x->type == -RAY_I64 ||
x->type == RAY_TIMESTAMP || x->type == -RAY_TIMESTAMP) {
h = do_cache_mix(h, (uint64_t)x->i64);
} else if (x->type == RAY_I32 || x->type == -RAY_I32 ||
x->type == RAY_DATE || x->type == -RAY_DATE ||
x->type == RAY_TIME || x->type == -RAY_TIME) {
h = do_cache_mix(h, (uint64_t)(uint32_t)x->i32);
} else if (x->type == RAY_I16 || x->type == -RAY_I16) {
h = do_cache_mix(h, (uint64_t)(uint16_t)x->i16);
} else if (x->type == RAY_U8 || x->type == -RAY_U8 ||
x->type == RAY_BOOL || x->type == -RAY_BOOL) {
h = do_cache_mix(h, (uint64_t)x->u8);
} else if (x->type == RAY_F64 || x->type == -RAY_F64) {
uint64_t bits = 0;
memcpy(&bits, &x->f64, sizeof(bits));
h = do_cache_mix(h, bits);
}
return h;
}

static bool do_cache_contains_set(ray_t* x) {
if (!x || x->type != RAY_LIST) return false;
ray_t** elems = (ray_t**)ray_data(x);
if (x->len > 0 && elems[0] && elems[0]->type == -RAY_SYM) {
ray_t* s = ray_sym_str(elems[0]->i64);
bool is_set = s && ray_str_len(s) == 3 &&
memcmp(ray_str_ptr(s), "set", 3) == 0;
if (s) ray_release(s);
if (is_set) return true;
}
for (int64_t i = 0; i < x->len; i++)
if (do_cache_contains_set(elems[i]))
return true;
return false;
}

static bool do_cache_is_null_name(ray_t* x) {
if (!x || x->type != -RAY_SYM || !(x->attrs & RAY_ATTR_NAME)) return false;
ray_t* s = ray_sym_str(x->i64);
bool ok = s && ray_str_len(s) == 4 && memcmp(ray_str_ptr(s), "null", 4) == 0;
if (s) ray_release(s);
return ok;
}

#define DO_NULL_CACHE_N 2048
static uint64_t g_do_null_cache[DO_NULL_CACHE_N];
static uint64_t g_do_null_cache_env_gen[DO_NULL_CACHE_N];
static uint16_t g_do_null_cache_next = 0;

static bool do_null_cache_get(uint64_t hash) {
if (!hash) return false;
uint64_t env_gen = ray_env_generation();
for (uint16_t i = 0; i < DO_NULL_CACHE_N; i++)
if (g_do_null_cache[i] == hash &&
g_do_null_cache_env_gen[i] == env_gen)
return true;
return false;
}

static void do_null_cache_put(uint64_t hash) {
if (hash) {
uint16_t slot = g_do_null_cache_next++ % DO_NULL_CACHE_N;
g_do_null_cache[slot] = hash;
g_do_null_cache_env_gen[slot] = ray_env_generation();
}
}

/* (do expr1 expr2 ...) — evaluate in sequence, return last. Pushes local scope. */
ray_t* ray_do_fn(ray_t** args, int64_t n) {
if (n == 0) return make_i64(0);
uint64_t null_cache_hash = 0;
if (g_ray_profile.active &&
n == 2 && do_cache_is_null_name(args[1]) &&
!do_cache_contains_set(args[0])) {
null_cache_hash = do_cache_hash(args[0]);
if (do_null_cache_get(null_cache_hash))
return NULL;
}
if (ray_env_push_scope() != RAY_OK) return ray_error("oom", NULL);
ray_t* result = NULL;
for (int64_t i = 0; i < n; i++) {
Expand All @@ -1603,8 +1496,6 @@ ray_t* ray_do_fn(ray_t** args, int64_t n) {
}
}
ray_env_pop_scope();
if (null_cache_hash && result == NULL)
do_null_cache_put(null_cache_hash);
return result;
}

Expand Down
16 changes: 15 additions & 1 deletion src/mem/heap.c
Original file line number Diff line number Diff line change
Expand Up @@ -1262,7 +1262,11 @@ void ray_heap_destroy(void) {
* -------------------------------------------------------------------------- */

static void heap_return_foreign_freelist(ray_heap_t* h) {
/* avail bit (set on insert, cleared on remove) tells us which
* freelist orders have any blocks at all — skip the empty ones. */
if (!h->avail) return;
for (int order = RAY_ORDER_MIN; order < RAY_HEAP_FL_SIZE; order++) {
if (!(h->avail & (1ULL << order))) continue;
ray_fl_head_t* head = &h->freelist[order];
ray_t* blk = head->fl_next;
while (blk != (ray_t*)head) {
Expand Down Expand Up @@ -1473,11 +1477,21 @@ void ray_heap_gc(void) {
/* Pass 5: Release physical pages from free blocks in every
* idle heap. Pass 2 may have returned blocks to worker-owned
* freelists; releasing only the caller heap leaves those worker
* pages resident across large query repetitions. */
* pages resident across large query repetitions.
*
* Use each heap's avail bitmap (set on insert, cleared on
* remove) to skip the entire walk when no order >= 13 has any
* free block. Tiny-query workloads — where the per-statement
* GC fires before any large allocation has been freed —
* complete pass 5 without entering the body. */
uint64_t large_orders_mask = ~((1ULL << 13) - 1);
for (int hid = 0; hid < RAY_HEAP_REGISTRY_SIZE; hid++) {
ray_heap_t* gh = ray_heap_registry[hid];
if (!gh) continue;
uint64_t avail = gh->avail & large_orders_mask;
if (!avail) continue;
for (int i = 13; i < RAY_HEAP_FL_SIZE; i++) {
if (!(avail & (1ULL << i))) continue;
ray_fl_head_t* head = &gh->freelist[i];
ray_t* blk = head->fl_next;
while (blk != (ray_t*)head) {
Expand Down
Loading
Loading