Skip to content

Commit e3c52c4

Browse files
authored
Refine emit after session close (#530)
1 parent 16f9df1 commit e3c52c4

File tree

1 file changed

+264
-35
lines changed

1 file changed

+264
-35
lines changed

docs/global-aggregation.md

Lines changed: 264 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -172,60 +172,289 @@ EMIT ON UPDATE WITH BATCH 1s;
172172

173173
This query checks for changes every second and emits results only when updates occur.
174174

175-
### `EMIT AFTER KEY EXPIRE`
175+
### `EMIT AFTER SESSION CLOSE`
176176

177-
Designed for **OpenTelemetry trace analysis** and other similar use cases where you need to track **key lifetimes** across high-cardinality datasets (e.g., trace spans).
177+
Designed for **sessionization**, **OpenTelemetry trace analysis** and other similar use cases where you need to track **session lifetimes** across high-cardinality datasets (e.g., trace spans).
178178

179-
This policy emits aggregation results once a key is considered **expired**.
179+
This policy emits aggregation results once a session is considered **closed** or **expired**.
180180

181181
**Syntax**:
182182

183183
```sql
184-
EMIT AFTER KEY EXPIRE [IDENTIFIED BY <col>] WITH [ONLY] MAXSPAN <interval> [AND TIMEOUT <interval>]
184+
EMIT AFTER SESSION CLOSE
185+
[IDENTIFIED BY (<ts_col>[, <session_start_col>, <session_end_col>])]
186+
WITH [ONLY] MAXSPAN <interval>
187+
[AND TIMEOUT <interval>]
185188
```
186189

187190
**Parameters**:
188-
* `EMIT AFTER KEY EXPIRE` - enables per-key lifetime tracking.
189-
* `IDENTIFIED BY <col>` - column used to compute span duration (defaults to **_tp_time** if omitted).
190-
* `MAXSPAN <interval>` - maximum allowed span before emission.
191-
* `ONLY` - emit results only if span exceeds MAXSPAN.
192-
* `TIMEOUT <interval>` - forces emission after inactivity to avoid waiting indefinitely.
191+
* `EMIT AFTER SESSION CLOSE`. Enables per-session lifetime tracking and emits results when a session ends.
192+
* `IDENTIFIED BY (<ts_col>[, <session_start_col>, <session_end_col>])`. Defines how session boundaries are identified.
193+
* `<ts_col>` — Timestamp column used to calculate session span duration. Default: _tp_time.
194+
* `<session_start_col>` — Boolean column indicating when a session starts. Usually it is predicates evaluated in the SELECT. (Optional)
195+
* `<session_end_col>` — Boolean column indicating when a session ends. Usually it is predicates evaluated in the SELECT. (Optional)
196+
* `MAXSPAN <interval>`. The maximum duration allowed for a session before it is emitted, regardless of new activity.
197+
* `ONLY`. When specified, results are emitted only if the session’s duration exceeds `MAXSPAN`.
198+
* `TIMEOUT <interval>`. Defines the maximum wall-clock duration a session can remain open. If a session stays active longer than this interval, it is automatically emitted to prevent indefinite waiting.
193199

194-
:::info
195-
Currently must be used with `SETTINGS default_hash_table='hybrid'` to prevent excessive memory usage.
196-
:::
200+
#### Variants of `IDENTIFIED BY`
197201

198-
**Example**:
202+
Different configurations of `IDENTIFIED BY` allow flexible session control depending on the availability of start/end indicators:
203+
204+
1. `IDENTIFIED BY ts_col`.
205+
206+
No explicit session start or end signals.
207+
* A session closes when `MAXSPAN` or `TIMEOUT` is reached.
208+
* All events for the same session key are included.
209+
210+
2. `IDENTIFIED BY (ts_col, session_start_col, session_end_col)`.
211+
212+
Both start and end conditions are explicitly defined.
213+
* A session opens when `session_start_col = true`.
214+
* Events before an open session are ignored.
215+
* The session closes when `session_end_col = true`, or when `MAXSPAN` or `TIMEOUT` is reached.
216+
217+
3. `IDENTIFIED BY (ts_col, session_start_col, false)`.
218+
219+
Only a session start condition is defined.
220+
* A session opens when `session_start_col = true`.
221+
* Events before a session opens are ignored.
222+
* The session closes when `MAXSPAN` or `TIMEOUT` is reached.
223+
224+
4. `IDENTIFIED BY (ts_col, true, session_end_col)`.
225+
226+
Only a session end condition is defined.
227+
* A session opens when the first event is observed.
228+
* The session closes when `session_end_col = true`, or when `MAXSPAN` or `TIMEOUT` is reached.
229+
230+
#### Session Fine-Tuning Settings
231+
232+
Timeplus provides several query settings to fine-tune session behavior:
233+
234+
1. `merge_open_sessions` — (Default: `false`).
235+
236+
Controls whether multiple overlapping or consecutive sessions for the same key should be merged into a single extended session. When set to `true`, if a new session starts before the previous one closes, Timeplus merges them into one continuous session.
237+
238+
2. `include_session_end` — (Default: `true`)
239+
240+
Determines whether the event that triggers the session end should be included in the final session output. When set to `false`, the session end event itself will be excluded from the emitted session data.
241+
242+
#### Examples
243+
244+
Assume you have millions of network devices that go through a series of **state transitions** before establishing a connection. You want to analyze metrics such as **time-to-successful-connect** and **consecutive failures** for each device in real time.
199245

200246
```sql
201-
WITH grouped AS
247+
CREATE STREAM IF NOT EXISTS devices
202248
(
203-
SELECT
204-
trace_id,
205-
min(start_time) AS start_ts,
206-
max(end_time) AS end_ts,
207-
date_diff('ms', start_ts, end_ts) AS span_ms,
208-
group_array(json_encode(span_id, parent_span_id, name, start_time, end_time, attributes)) AS trace_events
209-
FROM otel_traces
210-
SHUFFLE BY trace_id
211-
GROUP BY trace_id
212-
EMIT AFTER KEY EXPIRE IDENTIFIED BY end_time WITH ONLY MAXSPAN 500ms AND TIMEOUT 2s
249+
`device` string,
250+
`phase` string, -- 'assoc' -> 'auth' -> 'dhcp' -> 'dns' -> 'connection'
251+
`status` string -- 'success', 'failed'
252+
);
253+
```
254+
255+
In this schema:
256+
- `device` uniquely indentifies each device.
257+
- `phase` represents the current state in the connection workflow. The initial phase of a connection starts with `'assoc'` and ends with `'connection'` when successful.
258+
- `status` indicates whether the transition was successful or failed.
259+
260+
##### Example 1: Time-to-Successful-Connect
261+
262+
```sql
263+
-- Time to successful connect
264+
WITH connect_phase_events AS
265+
(
266+
SELECT
267+
*,
268+
phase = 'assoc' AS session_start, -- defines the session start predicates
269+
phase = 'connection' AND status = 'success' AS session_end -- defines the session ends predicates
270+
FROM
271+
devices
272+
WHERE phase IN ('assoc', 'auth', 'dhcp', 'dns', 'connection')
213273
)
214-
SELECT json_encode(trace_id, start_ts, end_ts, span_ms, trace_events) AS event
215-
FROM grouped
216-
SETTINGS
217-
default_hash_table='hybrid',
218-
max_hot_keys=1000000;
274+
SELECT device,
275+
count() AS events,
276+
count_if(status = 'failed') AS fails,
277+
min(_tp_time) AS session_start_ts,
278+
max(_tp_time) AS session_end_ts,
279+
date_diff('ms', session_start_ts, session_end_ts) AS time_to_successful_connect_ms
280+
FROM connect_phase_events
281+
GROUP BY device
282+
EMIT AFTER SESSION CLOSE IDENTIFIED BY (_tp_time, session_start, session_end) WITH MAXSPAN 1s AND TIMEOUT 2s;
219283
```
220284

221285
**Explanation**:
286+
- `session_start` marks when a session begins (`phase = 'assoc'`).
287+
- `session_end` marks when the connection succeeds (`phase = 'connection' AND status = 'success'`) and hence session ends.
288+
- `IDENTIFIED BY (_tp_time, session_start, session_end)` controls when sessions open and close.
289+
290+
**Sample events**:
291+
```sql
292+
INSERT INTO devices (device, phase, status, _tp_time) VALUES
293+
('dev1', 'assoc', 'success', '2025-01-01 00:00:00.000'),
294+
('dev1', 'auth', 'success', '2025-01-01 00:00:00.001'),
295+
('dev1', 'dhcp', 'success', '2025-01-01 00:00:00.002'),
296+
('dev1', 'dns', 'success', '2025-01-01 00:00:00.003'),
297+
('dev1', 'connection', 'success', '2025-01-01 00:00:01.100');
298+
```
299+
300+
**Output**:
301+
```
302+
┌─device─┬─events─┬─fails─┬────────session_start_ts─┬──────────session_end_ts─┬─time_to_successful_connect_ms─┐
303+
│ dev1 │ 5 │ 0 │ 2025-01-01 00:00:00.000 │ 2025-01-01 00:00:01.100 │ 1100 │
304+
└────────┴────────┴───────┴─────────────────────────┴─────────────────────────┴───────────────────────────────┘
305+
```
306+
307+
##### Example 2: Handling Failures and Retries
222308

223-
- Tracks `trace_id` events with start/end times.
224-
- Emits results when:
225-
- The span exceeds `MAXSPAN` (500 ms), or
226-
- No new events arrive for `TIMEOUT` (2 s).
227-
- The `ONLY` modifier ensures only traces exceeding the span threshold (500ms) are emitted.
228-
- Expired keys are garbage-collected after emission.
309+
If a device retries failed phases, use `merge_open_sessions = true` to merge overlapping sessions.
310+
311+
**Query**:
312+
```sql
313+
-- Time to successful connect
314+
WITH connect_phase_events AS
315+
(
316+
SELECT
317+
*,
318+
phase = 'assoc' AS session_start, -- defines the session start predicates
319+
phase = 'connection' AND status = 'success' AS session_end -- defines the session ends predicates
320+
FROM
321+
devices
322+
WHERE phase IN ('assoc', 'auth', 'dhcp', 'dns', 'connection')
323+
)
324+
SELECT device,
325+
count() AS events,
326+
count_if(status = 'failed') AS fails,
327+
min(_tp_time) AS session_start_ts,
328+
max(_tp_time) AS session_end_ts,
329+
date_diff('ms', session_start_ts, session_end_ts) AS time_to_successful_connect_ms
330+
FROM connect_phase_events
331+
GROUP BY device
332+
EMIT AFTER SESSION CLOSE IDENTIFIED BY (_tp_time, session_start, session_end) WITH MAXSPAN 1s AND TIMEOUT 2s
333+
SETTINGS merge_open_sessions = true; -- Merge open sessions
334+
```
335+
336+
**Sample Events**:
337+
```
338+
INSERT INTO devices (device, phase, status, _tp_time) VALUES
339+
('dev1', 'assoc', 'failed', '2025-01-01 00:00:00.000'),
340+
('dev1', 'assoc', 'failed', '2025-01-01 00:00:00.201'),
341+
('dev1', 'assoc', 'success', '2025-01-01 00:00:00.302'),
342+
('dev1', 'auth', 'success', '2025-01-01 00:00:00.403'),
343+
('dev1', 'dhcp', 'success', '2025-01-01 00:00:00.504'),
344+
('dev1', 'dns', 'success', '2025-01-01 00:00:00.805'),
345+
('dev1', 'connection', 'success', '2025-01-01 00:00:02.100');
346+
```
347+
348+
**Output**:
349+
```
350+
┌─device─┬─events─┬─fails─┬────────session_start_ts─┬──────────session_end_ts─┬─time_to_successful_connect_ms─┐
351+
│ dev1 │ 7 │ 2 │ 2025-01-01 00:00:00.000 │ 2025-01-01 00:00:02.100 │ 2100 │
352+
└────────┴────────┴───────┴─────────────────────────┴─────────────────────────┴───────────────────────────────┘
353+
```
354+
355+
Without `merge_open_sessions = true`, each `assoc` event will start a new session and the next `assoc` event will force close the previous session, so there will be multiple sessions emitted.
356+
357+
##### Example 3: Handling Out-of-Order Events
358+
359+
If events arrive slightly out of order, you can use an `IDENTIFIED BY` variant to open sessions for any event from the same device.
360+
361+
**Query**:
362+
```sql
363+
WITH connect_phase_events AS
364+
(
365+
SELECT
366+
*,
367+
phase = 'connection' AND status = 'success' AS session_end -- defines the session ends predicates
368+
FROM
369+
devices
370+
WHERE phase IN ('assoc', 'auth', 'dhcp', 'dns', 'connection')
371+
)
372+
SELECT device,
373+
count() AS events,
374+
count_if(status = 'failed') AS fails,
375+
min(_tp_time) AS session_start_ts,
376+
max(_tp_time) AS session_end_ts,
377+
date_diff('ms', session_start_ts, session_end_ts) AS time_to_successful_connect_ms
378+
FROM connect_phase_events
379+
GROUP BY device
380+
EMIT AFTER SESSION CLOSE IDENTIFIED BY (_tp_time, true, session_end) WITH MAXSPAN 1s AND TIMEOUT 2s
381+
SETTINGS merge_open_sessions = true;
382+
```
383+
384+
**Sample Events**:
385+
```sql
386+
-- out of order
387+
INSERT INTO devices (device, phase, status, _tp_time) VALUES
388+
('dev1', 'auth', 'success', '2025-01-01 00:00:00.001'),
389+
('dev1', 'dhcp', 'success', '2025-01-01 00:00:00.002'),
390+
('dev1', 'assoc', 'success', '2025-01-01 00:00:00.000'),
391+
('dev1', 'dns', 'success', '2025-01-01 00:00:00.003'),
392+
('dev1', 'connection', 'success', '2025-01-01 00:00:01.100');
393+
```
394+
395+
**Output**:
396+
```
397+
┌─device─┬─events─┬─fails─┬────────session_start_ts─┬──────────session_end_ts─┬─time_to_successful_connect_ms─┐
398+
│ dev1 │ 5 │ 0 │ 2025-01-01 00:00:00.000 │ 2025-01-01 00:00:01.100 │ 1100 │
399+
└────────┴────────┴───────┴─────────────────────────┴─────────────────────────┴───────────────────────────────┘
400+
```
401+
402+
Here, `IDENTIFIED BY (_tp_time, true, session_end)` means:
403+
- Any event for the device can open a session since `session_start` is `true`.
404+
- The session closes when the `session_end` condition is met.
405+
406+
##### Example 4: Consecutive Failure Metrics
407+
408+
To calculate **consecutive failures** per phase:
409+
1. A failure (`status = 'failed'`) starts a session.
410+
2. The session ends when a success (`status = 'success'`) occurs.
411+
3. Use `include_session_end = false` setting to exclude the successful event which ends the session from the count (the session).
412+
413+
**Query**:
414+
```sql
415+
WITH connect_phase_events AS
416+
(
417+
SELECT
418+
*,
419+
status = 'failed' AS session_start, -- defines session start predicates
420+
status = 'success' AS session_end -- defines session end predicates
421+
FROM
422+
devices
423+
WHERE phase IN ('assoc', 'auth', 'dhcp', 'dns', 'connection')
424+
)
425+
SELECT device,
426+
phase,
427+
count() AS consecutive_fails,
428+
min(_tp_time) AS session_start_ts,
429+
max(_tp_time) AS session_end_ts
430+
FROM connect_phase_events
431+
GROUP BY device, phase
432+
EMIT AFTER SESSION CLOSE IDENTIFIED BY (_tp_time, session_start, session_end) WITH MAXSPAN 1s AND TIMEOUT 2s
433+
SETTINGS
434+
include_session_end = false, -- Exclude the session ends events from the session
435+
merge_open_sessions=true;
436+
```
437+
438+
**Sample Events**:
439+
```
440+
INSERT INTO devices (device, phase, status, _tp_time) VALUES
441+
('dev1', 'assoc', 'failed', '2025-01-01 00:00:00.000'),
442+
('dev1', 'assoc', 'failed', '2025-01-01 00:00:00.201'),
443+
('dev1', 'assoc', 'success', '2025-01-01 00:00:00.302'),
444+
('dev1', 'auth', 'success', '2025-01-01 00:00:00.403'),
445+
('dev1', 'dhcp', 'failed', '2025-01-01 00:00:00.504'),
446+
('dev1', 'dhcp', 'success', '2025-01-01 00:00:00.604'),
447+
('dev1', 'dns', 'success', '2025-01-01 00:00:00.805'),
448+
('dev1', 'connection', 'success', '2025-01-01 00:00:02.100');
449+
```
450+
451+
**Output**:
452+
```
453+
┌─device─┬─phase─┬─consecutive_fails─┬────────session_start_ts─┬──────────session_end_ts─┐
454+
│ dev1 │ assoc │ 2 │ 2025-01-01 00:00:00.000 │ 2025-01-01 00:00:00.201 │
455+
│ dev1 │ dhcp │ 1 │ 2025-01-01 00:00:00.504 │ 2025-01-01 00:00:00.504 │
456+
└────────┴───────┴───────────────────┴─────────────────────────┴─────────────────────────┘
457+
```
229458

230459
### `EMIT PER EVENT`
231460

0 commit comments

Comments
 (0)