Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,15 @@ Every Stratum dataset is a copy-on-write value. Fork one in O(1) to create an is

**ASOF JOIN**: `ASOF [LEFT] JOIN dim ON l.key = r.key AND l.ts >= r.ts` - DuckDB-style syntax. Each probe row matches the closest preceding (or following) build row per partition. Radix-partitioned, parallel, two-pointer merge.

**Window functions**: ROW_NUMBER, RANK, DENSE_RANK, NTILE, PERCENT_RANK, CUME_DIST, LAG, LEAD, SUM/AVG/COUNT/MIN/MAX OVER - with PARTITION BY, ORDER BY, and frame clauses
**Time-series helpers** (Clojure API): `stratum.api/window-join` (q-style `wj`: aggregate all right rows in `[t+lo, t+hi]` per left row, prefix-sum-accelerated for SUM/AVG/COUNT), `stratum.api/latest-on` (most-recent row per partition, equivalent to `DISTINCT ON`), `stratum.api/generate-series` (dense numeric or temporal spine for gap-filling joins).

**Window functions**: ROW_NUMBER, RANK, DENSE_RANK, NTILE, PERCENT_RANK, CUME_DIST, LAG, LEAD, FIRST_VALUE, LAST_VALUE, NTH_VALUE, SUM/AVG/COUNT/MIN/MAX OVER - with PARTITION BY, ORDER BY, and frame clauses (both `ROWS` and `RANGE BETWEEN INTERVAL ...` for value-distance sliding windows on irregular time series)

**Subqueries and composition**: CTEs (WITH), uncorrelated subqueries (IN/NOT IN), derived tables in FROM

**Expressions**: CASE WHEN, COALESCE, NULLIF, GREATEST, LEAST, CAST, arithmetic (+, -, \*, /, %)

**Date/time**: DATE_TRUNC, DATE_ADD, DATE_DIFF, EXTRACT, EPOCH_DAYS, EPOCH_SECONDS
**Date/time**: DATE_TRUNC, DATE_ADD, DATE_DIFF, EXTRACT (year/month/day/hour/minute/second/millisecond/microsecond/day-of-week/week-of-year), TIME_BUCKET, EPOCH_DAYS, EPOCH_SECONDS. TIMESTAMP columns track precision via `:temporal-unit` metadata (`:days` / `:seconds` / `:millis` / `:micros`); the kernels dispatch on the unit, with microseconds the DuckDB-compatible default.

**String**: LIKE, ILIKE, LENGTH, UPPER, LOWER, SUBSTR (usable in SELECT, WHERE, GROUP BY, ORDER BY)

Expand Down Expand Up @@ -230,9 +232,17 @@ The DSL is intentionally flat. Every clause resolves column names by keyword loo
;; Supported predicates: :< :<= :> :>= := :!= :between :in :not-in
;; :like :ilike :is-null :is-not-null :or :not
;; Expressions: [:+ :a :b] [:- :a 1] [:* :price :qty] [:/ :a :b]
;; [:date-trunc :day :ts] [:extract :hour :ts]
;; [:date-trunc :day :ts] [:hour :ts] ;; (or :year :month :millisecond :microsecond, etc.)
;; [:time-bucket 5 :minutes :ts] ;; arbitrary-width bucketing
;; [:coalesce :a 0] [:nullif :a 0]
;; [:greatest :a :b] [:least :a :b]
;; Window ops: :row-number :rank :dense-rank :ntile :percent-rank :cume-dist
;; :sum :count :avg :min :max :lag :lead
;; :first-value :last-value :nth-value
;; :fills (LOCF) :ema :rleid
;; :mavg :msum :mmin :mmax :mcount :mdev ;; q-style sliding aggregates
;; Window frames: {:type :rows :start [N :preceding] :end :current-row}
;; {:type :range :start [interval :preceding] :end :current-row}
```

## Ecosystem
Expand All @@ -255,6 +265,7 @@ All share copy-on-write semantics and can be branched together via Yggdrasil.
- **Data**: CSV/Parquet import, dictionary-encoded strings, PostgreSQL NULL semantics, ad-hoc file queries
- **Integration**: tablecloth/tech.ml.dataset interop, Datahike, Yggdrasil
- **Analytics**: Isolation forest anomaly detection (SQL model management, scoring, online rotation)
- **Time-series**: microsecond-precision TIMESTAMP, RANGE-BETWEEN-INTERVAL frames, TIME_BUCKET, FIRST/LAST/NTH_VALUE, FILLS/LOCF, EMA, RLEID, q-style moving aggregates (MAVG/MSUM/MMIN/MMAX/MDEV), window-join (`wj`) and LATEST ON (DISTINCT ON) helpers

## Architecture

Expand Down
232 changes: 232 additions & 0 deletions src-java/stratum/internal/ColumnOps.java
Original file line number Diff line number Diff line change
Expand Up @@ -2240,6 +2240,238 @@ public static double[] arrayDateDiffSeconds(long[] a, long[] b, int length) {
return r;
}

// =========================================================================
// Microsecond-precision Date/Time Operations
// =========================================================================
// TIMESTAMP columns with :temporal-unit :micros store microseconds since
// 1970-01-01T00:00:00 UTC. Range: ±290k years.
// Conversion factors:
// 1 second = 1_000_000 micros
// 1 millisecond = 1_000 micros
// 1 minute = 60_000_000 micros
// 1 hour = 3_600_000_000 micros
// 1 day = 86_400_000_000 micros
// =========================================================================

static final long MICROS_PER_MILLI = 1_000L;
static final long MICROS_PER_SECOND = 1_000_000L;
static final long MICROS_PER_MINUTE = 60L * MICROS_PER_SECOND;
static final long MICROS_PER_HOUR = 60L * MICROS_PER_MINUTE;
static final long MICROS_PER_DAY = 24L * MICROS_PER_HOUR;

/** DATE_TRUNC to micro (identity): pass-through, no rounding needed. */
public static long[] arrayDateTruncMicroMicros(long[] em, int length) {
return java.util.Arrays.copyOf(em, length);
}

/** DATE_TRUNC to millisecond on epoch-micros column. */
public static long[] arrayDateTruncMilliMicros(long[] em, int length) {
long[] r = new long[length];
for (int i = 0; i < length; i++) {
r[i] = Math.floorDiv(em[i], MICROS_PER_MILLI) * MICROS_PER_MILLI;
}
return r;
}

/** DATE_TRUNC to second on epoch-micros column. */
public static long[] arrayDateTruncSecondMicros(long[] em, int length) {
long[] r = new long[length];
for (int i = 0; i < length; i++) {
r[i] = Math.floorDiv(em[i], MICROS_PER_SECOND) * MICROS_PER_SECOND;
}
return r;
}

/** DATE_TRUNC to minute on epoch-micros column. */
public static long[] arrayDateTruncMinuteMicros(long[] em, int length) {
long[] r = new long[length];
for (int i = 0; i < length; i++) {
r[i] = Math.floorDiv(em[i], MICROS_PER_MINUTE) * MICROS_PER_MINUTE;
}
return r;
}

/** DATE_TRUNC to hour on epoch-micros column. */
public static long[] arrayDateTruncHourMicros(long[] em, int length) {
long[] r = new long[length];
for (int i = 0; i < length; i++) {
r[i] = Math.floorDiv(em[i], MICROS_PER_HOUR) * MICROS_PER_HOUR;
}
return r;
}

/** DATE_TRUNC to day on epoch-micros column. */
public static long[] arrayDateTruncDayMicros(long[] em, int length) {
long[] r = new long[length];
for (int i = 0; i < length; i++) {
r[i] = Math.floorDiv(em[i], MICROS_PER_DAY) * MICROS_PER_DAY;
}
return r;
}

/** DATE_TRUNC to month on epoch-micros column. Uses Hinnant civil arithmetic. */
public static long[] arrayDateTruncMonthMicros(long[] em, int length) {
long[] r = new long[length];
long[] ymd = new long[3];
for (int i = 0; i < length; i++) {
long epochDays = Math.floorDiv(em[i], MICROS_PER_DAY);
civilFromDays(epochDays, ymd);
r[i] = civilToDays(ymd[0], ymd[1], 1) * MICROS_PER_DAY;
}
return r;
}

/** DATE_TRUNC to year on epoch-micros column. */
public static long[] arrayDateTruncYearMicros(long[] em, int length) {
long[] r = new long[length];
long[] ymd = new long[3];
for (int i = 0; i < length; i++) {
long epochDays = Math.floorDiv(em[i], MICROS_PER_DAY);
civilFromDays(epochDays, ymd);
r[i] = civilToDays(ymd[0], 1, 1) * MICROS_PER_DAY;
}
return r;
}

/** Extract hour (0-23) from epoch-micros array. */
public static double[] arrayExtractHourMicros(long[] em, int length) {
double[] r = new double[length];
for (int i = 0; i < length; i++) {
long t = Math.floorMod(em[i], MICROS_PER_DAY);
r[i] = (double) (t / MICROS_PER_HOUR);
}
return r;
}

/** Extract minute (0-59) from epoch-micros array. */
public static double[] arrayExtractMinuteMicros(long[] em, int length) {
double[] r = new double[length];
for (int i = 0; i < length; i++) {
long t = Math.floorMod(em[i], MICROS_PER_HOUR);
r[i] = (double) (t / MICROS_PER_MINUTE);
}
return r;
}

/** Extract second (0-59) from epoch-micros array. */
public static double[] arrayExtractSecondMicros(long[] em, int length) {
double[] r = new double[length];
for (int i = 0; i < length; i++) {
long t = Math.floorMod(em[i], MICROS_PER_MINUTE);
r[i] = (double) (t / MICROS_PER_SECOND);
}
return r;
}

/** Extract millisecond-of-second (0-999) from epoch-micros array. */
public static double[] arrayExtractMillisecondMicros(long[] em, int length) {
double[] r = new double[length];
for (int i = 0; i < length; i++) {
long t = Math.floorMod(em[i], MICROS_PER_SECOND);
r[i] = (double) (t / MICROS_PER_MILLI);
}
return r;
}

/** Extract microsecond-of-second (0-999999) from epoch-micros array. */
public static double[] arrayExtractMicrosecondMicros(long[] em, int length) {
double[] r = new double[length];
for (int i = 0; i < length; i++) {
r[i] = (double) Math.floorMod(em[i], MICROS_PER_SECOND);
}
return r;
}

/** DATE_ADD on epoch-micros: add N micros. */
public static long[] arrayDateAddMicrosMicros(long[] em, long n, int length) {
long[] r = new long[length];
for (int i = 0; i < length; i++) r[i] = em[i] + n;
return r;
}

/** DATE_ADD months on epoch-micros column. */
public static long[] arrayDateAddMonthsMicros(long[] em, int nMonths, int length) {
long[] r = new long[length];
long[] ymd = new long[3];
for (int i = 0; i < length; i++) {
long s = em[i];
long epochDays = Math.floorDiv(s, MICROS_PER_DAY);
long timeOfDay = s - epochDays * MICROS_PER_DAY;
civilFromDays(epochDays, ymd);
long totalMonths = ymd[0] * 12 + (ymd[1] - 1) + nMonths;
long newYear = Math.floorDiv(totalMonths, 12);
long newMonth = Math.floorMod(totalMonths, 12) + 1;
long maxDay;
if (newMonth == 2) {
boolean leap = (newYear % 4 == 0 && newYear % 100 != 0) || (newYear % 400 == 0);
maxDay = leap ? 29 : 28;
} else if (newMonth == 4 || newMonth == 6 || newMonth == 9 || newMonth == 11) {
maxDay = 30;
} else {
maxDay = 31;
}
long day = Math.min(ymd[2], maxDay);
r[i] = civilToDays(newYear, newMonth, day) * MICROS_PER_DAY + timeOfDay;
}
return r;
}

/** DATE_DIFF in micros between two epoch-micros columns. */
public static double[] arrayDateDiffMicros(long[] a, long[] b, int length) {
double[] r = new double[length];
for (int i = 0; i < length; i++) r[i] = (double)(a[i] - b[i]);
return r;
}

/** TIME_BUCKET on epoch-micros column with arbitrary micro-width.
* Bucket boundaries are aligned to epoch (origin = 0). For each row:
* bucket = floor(em[i] / width) * width
* Width must be > 0. */
public static long[] arrayTimeBucketMicros(long[] em, long widthMicros, int length) {
long[] r = new long[length];
for (int i = 0; i < length; i++) {
r[i] = Math.floorDiv(em[i], widthMicros) * widthMicros;
}
return r;
}

/** TIME_BUCKET on epoch-micros column with origin offset.
* bucket = floor((em[i] - origin) / width) * width + origin */
public static long[] arrayTimeBucketMicrosOrigin(long[] em, long widthMicros, long originMicros, int length) {
long[] r = new long[length];
for (int i = 0; i < length; i++) {
long shifted = em[i] - originMicros;
r[i] = Math.floorDiv(shifted, widthMicros) * widthMicros + originMicros;
}
return r;
}

/** TIME_BUCKET on epoch-days column (DATE) with day-width. */
public static long[] arrayTimeBucketDays(long[] ed, long widthDays, int length) {
long[] r = new long[length];
for (int i = 0; i < length; i++) {
r[i] = Math.floorDiv(ed[i], widthDays) * widthDays;
}
return r;
}

/** TIME_BUCKET on epoch-days column with month-width. Aligned to month boundaries.
* Each input is converted to (year, month) total months since epoch and bucketed. */
public static long[] arrayTimeBucketMonths(long[] ed, int widthMonths, int length) {
long[] r = new long[length];
long[] ymd = new long[3];
for (int i = 0; i < length; i++) {
civilFromDays(ed[i], ymd);
// months since 1970-01: year*12 + (month-1), but adjusted so 1970-01 = 0
long totalMonths = (ymd[0] - 1970) * 12 + (ymd[1] - 1);
long bucket = Math.floorDiv(totalMonths, widthMonths) * widthMonths;
long bucketYear = 1970 + Math.floorDiv(bucket, 12);
long bucketMonth = Math.floorMod(bucket, 12) + 1;
r[i] = civilToDays(bucketYear, bucketMonth, 1);
}
return r;
}

// =========================================================================
// Parallel Execution
// =========================================================================
Expand Down
Loading
Loading