Skip to content

Commit b312eea

Browse files
authored
refine global window (#520)
1 parent 1d7c17a commit b312eea

File tree

3 files changed

+142
-160
lines changed

3 files changed

+142
-160
lines changed

docs/global-aggregation.md

Lines changed: 140 additions & 158 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,27 @@
11
# Global Aggregation
22

3-
## Overview
3+
## Overview
44

5-
**Global aggregation** refers to running an aggregation query **without using streaming windows** such as `TUMBLE`, `HOP`, or `SESSION`.
5+
**Global aggregation** refers to running an aggregation query **without using time-based windows** such as [tumble](/tumble-aggregation), [hop](/hop-aggregation), or [session](/session-aggregation).
66

7-
Unlike windowed aggregations, global aggregation does not slice unbounded streaming data into time-based windows. Instead, it treats the entire unbounded stream as a **single global window**.
7+
Unlike windowed aggregations that slice unbounded streams into discrete windows, **global aggregation** treats the entire stream as **a single continuous window**.
88

9-
With global aggregation:
10-
- The query continuously updates aggregation results over all incoming data.
11-
- Users don’t need to worry about **late events**, since there are no time windows to close.
9+
With global aggregation:
10+
- The query continuously updates aggregation results as new events arrive.
11+
- There is **no concept of window close**, so late events are naturally handled without additional logic.
12+
- It is ideal for tracking long-running (life-time) metrics such as total counts, averages, or unique users across an entire stream against all historical data.
1213

1314
## Syntax
1415

1516
```sql
16-
SELECT <column_name1>, <column_name2>, <aggr_function>
17+
SELECT <grouping_keys>, <aggr_functions>
1718
FROM <stream_name>
1819
[WHERE <condition>]
19-
GROUP BY col1, col2, ...
20-
EMIT PERIODIC <n><UNIT>
20+
GROUP BY <col1>, <col2>, ...
21+
EMIT <emit_policies>
2122
```
2223

23-
The `EMIT PERIODIC <n><UNIT>` clause tells Timeplus to periodically emit aggregation results.
24-
- `<n>` must be an integer greater than 0.
25-
- `<UNIT>` can be one of:
26-
27-
- `ms` (milliseconds)
28-
- `s` (seconds)
29-
- `m` (minutes)
30-
- `h` (hours)
31-
32-
**Examples:**
24+
**Example**:
3325
```sql
3426
SELECT device, count(*)
3527
FROM device_utils
@@ -38,25 +30,26 @@ GROUP BY device
3830
EMIT PERIODIC 5s
3931
```
4032

41-
In this example:
42-
- The query continuously monitors new events in the stream `device_utils`.
43-
- It filters rows where cpu_usage > 99.
44-
- An **incremental count aggregation** is maintained per `device`.
45-
- Every **5 seconds**, the current aggregation result is emitted to clients.
33+
**Explanation**:
34+
- The query monitors new events from the stream `device_utils`.
35+
- It filters rows where `cpu_usage > 99`.
36+
- An **incremental count** is maintained per device.
37+
- Every **5 seconds**, the latest count per device is emitted to clients.
4638

47-
## TTL of Aggregation Keys
39+
## TTL of Aggregation Keys
4840

49-
Global aggregation does not automatically garbage-collect intermediate aggregation states after each emit.
50-
If the grouping keys grow monotonically over time (for example, when timestamps are part of the key), memory usage can eventually **blow up**.
41+
Global aggregations do not automatically garbage-collect intermediate states after each emission by default.
42+
If the grouping keys increase continuously (for example, by including timestamps), the aggregation state can grow indefinitely.
5143

52-
To address this challenge, you can use a **hybrid hash table** for aggregation states:
53-
- Hot keys are kept in memory.
54-
- Cold keys are spilled to disk using an LRU-like algorithm.
55-
- Combined with a **TTL for keys**, this approach provides the best of both worlds:
56-
- Handles very late events.
57-
- Prevents unbounded memory growth.
44+
To handle this, Timeplus supports a **hybrid hash table** that combines in-memory and on-disk state management:
5845

59-
**Example:**
46+
- **Hot keys** (recently active) are stored in memory.
47+
- **Cold keys** (inactive or rarely updated) are spilled to disk using an LRU-like algorithm.
48+
- Combined with **TTL-based cleanup**, this approach has these benifits:
49+
- Support for very late events.
50+
- Controlled memory usage for long-running queries.
51+
52+
**Example**:
6053

6154
```sql
6255
CREATE STREAM device_utils(
@@ -83,180 +76,169 @@ SETTINGS
8376
aggregate_state_ttl_sec=3600;
8477
```
8578

86-
- This query performs a **global aggregation** to calculate CPU metrics in **5-minute buckets per device**.
87-
- The grouping key includes `bucket_window_start`, which increases monotonically with time.
88-
- The hybrid hash table is enabled via `default_hash_table='hybrid'`.
89-
- Keeps up to `100,000` hot keys in memory per substream.
90-
- Cold keys are spilled to disk automatically.
91-
- The TTL is set to `3600` seconds (`aggregate_state_ttl_sec=3600`):
92-
- Keys not updated for an hour are garbage-collected from disk.
93-
- Prevents infinite state accumulation.
94-
- Data shuffling is enabled (SHUFFLE BY location) for better **parallelism and memory efficiency**.
95-
- See [Data Shuffle](/shuffle-data) for more details.
79+
**Explanation**:
80+
81+
- This query performs a **global aggregation** that computes CPU metrics for each device in 5-minute intervals.
82+
- The grouping key includes `bucket_window_start`, `location`, and `device`. The grouping key cardinality is monotoincally increasing as time goes.
83+
- The hybrid hash table manages the monotoincally increasing state efficiently:
84+
- Up to `100,000` hot keys per substream remain in memory.
85+
- Inactive keys are spilled to disk automatically.
86+
- Aggregation states are cleaned up after 1 hour (aggregate_state_ttl_sec=3600) if they are inactive. This effectively honors 1 hour late events.
87+
- `SHUFFLE BY location` improves **parallelism and memory efficiency**. See [Data Shuffle](/shuffle-data) for more details.
88+
89+
**Internal Pipeline**:
9690

97-
The internal query plan for this hybrid global aggregation looks like:
91+
The internal execution plan for hybrid global aggregation is shown below:
9892

9993
![HybridAggregationPipeline](/img/hybrid-aggregation-pipeline.svg)
10094

10195
## Emit Policies
10296

103-
Global aggregation supports different `emit policies` to control when you like to get the intermidiate results pushing out.
97+
Global aggregation supports multiple **emit policies** that define **when intermediate results** are pushed out.
10498

105-
### EMIT PERIODIC {#emit_periodic}
99+
### `EMIT PERIODIC`
106100

107-
`PERIODIC <n><UNIT>` tells Timeplus to emit the aggregation periodically. `UNIT` can be ms(millisecond), s(second), m(minute),h(hour),d(day).`<n>` shall be an integer greater than 0.
101+
Emits aggregation results periodically **when new events arrive**.
102+
This is the **default** emit policy for global aggregation, with a **default interval of 2 seconds**.
108103

109-
Example:
104+
**Syntax**
110105

111106
```sql
112-
SELECT device, count(*)
113-
FROM device_utils
114-
WHERE cpu_usage > 99
115-
EMIT PERIODIC 5s
107+
EMIT PERIODIC <n><UNIT>
116108
```
117109

118-
For [Global Streaming Aggregation](#global) the default periodic emit interval is `2s`, i.e. 2 seconds.
119-
120-
You can also apply `EMIT PERIODIC` in time windows, such as tumble/hop/session.
121-
122-
When you run a tumble window aggregation, by default Timeplus will emit results when the window is closed. So `tumble(stream,5s)` will emit results every 5 seconds, unless there is no event in the window to progress the watermark.
110+
**Parameters**:
111+
- `<n>` — positive integer (interval length)
112+
- `<UNIT>` can be one of:
113+
- `ms` (milliseconds)
114+
- `s` (seconds)
115+
- `m` (minutes)
116+
- `h` (hours)
123117

124-
In some cases, you may want to get aggregation results even the window is not closed, so that you can get timely alerts. For example, the following SQL will run a 5-second tumble window and every 1 second, if the number of event is over 300, a row will be emitted.
125118

119+
**Example**:
126120
```sql
127-
SELECT window_start, count() AS cnt
128-
FROM tumble(car_live_data, 5s)
129-
GROUP BY window_start
130-
HAVING cnt > 300
131-
EMIT PERIODIC 1s
121+
SELECT device, count(*)
122+
FROM device_utils
123+
WHERE cpu_usage > 99
124+
GROUP BY device
125+
EMIT PERIODIC 5s;
132126
```
133127

134-
### EMIT PERIODIC REPEAT {#emit_periodic_repeat}
128+
This query emits updated results every 5 seconds if new events are received.
135129

136-
Starting from Timeplus Proton 1.6.2, you can optionally add `REPEAT` to the end of `EMIT PERIODIC <n><UNIT>`. For global aggregations, by default every 2 seconds, the aggregation result will be emitted. But if there is no new event since last emit, no result will be emitted. With the `REPEAT` at the end of the emit policy, Timeplus will emit results at the fixed interval, even there is no new events since last emit. For example:
137-
```sql
138-
SELECT count() FROM t
139-
EMIT PERIODIC 3s REPEAT
140-
```
130+
### `EMIT PERIODIC REPEAT`
141131

142-
### EMIT TIMEOUT
132+
For `EMIT PERIODIC`, no results are emitted if there are **no new events** since the last emit.
133+
With the `REPEAT` modifier, Timeplus **emits at a fixed interval**, even when no new data arrives.
143134

144-
You can apply `EMIT TIMEOUT` on global aggregation, e.g.
135+
**Example**:
145136
```sql
146-
SELECT count() FROM t EMIT TIMEOUT 1s;
137+
SELECT device, count(*)
138+
FROM device_utils
139+
WHERE cpu_usage > 99
140+
GROUP BY device
141+
EMIT PERIODIC 5s REPEAT
147142
```
148143

149-
It also can be applied to window aggregations and `EMIT AFTER WINDOW CLOSE` is automatically appended, e.g.
150-
```sql
151-
SELECT count() FROM tumble(t,5s) GROUP BY window_start EMIT TIMEOUT 1s;
152-
```
153-
154-
### EMIT ON UPDATE {#emit_on_update}
144+
If no new events appear, the last results are still emitted every 5 seconds.
145+
146+
### `EMIT ON UPDATE`
155147

156-
You can apply `EMIT ON UPDATE` in time windows, such as tumble/hop/session, with `GROUP BY` keys. For example:
148+
Emits intermediate results **immediately** when new events change any aggregation value. This is useful for near real-time visibility into evolving metrics.
157149

158150
```sql
159-
SELECT
160-
window_start, cid, count() AS cnt
161-
FROM
162-
tumble(car_live_data, 5s)
163-
WHERE
164-
cid IN ('c00033', 'c00022')
165-
GROUP BY
166-
window_start, cid
167-
EMIT ON UPDATE
151+
SELECT device, count(*)
152+
FROM device_utils
153+
WHERE cpu_usage > 99
154+
GROUP BY device
155+
EMIT ON UPDATE;
168156
```
169157

170-
During the 5 second tumble window, even the window is not closed, as long as the aggregation value(`cnt`) for the same `cid` is different , the results will be emitted.
158+
Each time new events with `cpu_usage > 99` arrive, updated counts are emitted.
171159

172-
### EMIT ON UPDATE WITH BATCH {#emit_on_update_with_batch}
173-
174-
You can combine `EMIT PERIODIC` and `EMIT ON UPDATE` together. In this case, even the window is not closed, Timeplus will check the intermediate aggregation result at the specified interval and emit rows if the result is changed.
175-
```sql
176-
SELECT
177-
window_start, cid, count() AS cnt
178-
FROM
179-
tumble(car_live_data, 5s)
180-
WHERE
181-
cid IN ('c00033', 'c00022')
182-
GROUP BY
183-
window_start, cid
184-
EMIT ON UPDATE WITH BATCH 2s
185-
```
160+
### `EMIT ON UPDATE WITH BATCH`
186161

187-
### EMIT AFTER KEY EXPIRE IDENTIFIED BY .. WITH MAXSPAN .. AND TIMEOUT .. {#emit_after_key_expire}
162+
Combines **periodic emission** with **update-based** triggers.
163+
Timeplus checks the intermediate aggregation results at regular intervals and emits them if they have changed which can significally improve the emit efficiency and throughput compared with `EMIT ON UPDATE`.
188164

189-
The syntax is:
190165
```sql
191-
EMIT AFTER KEY EXPIRE [IDENTIFIED BY <col>] WITH [ONLY] MAXSPAN <internal> [AND TIMEOUT <internal>]
166+
SELECT device, count(*)
167+
FROM device_utils
168+
WHERE cpu_usage > 99
169+
GROUP BY device
170+
EMIT ON UPDATE WITH BATCH 1s;
192171
```
193172

194-
Note:
195-
* `EMIT AFTER KEY EXPIRE` will emit results when the keys are expired. This EMIT policy ought to be applied to a global aggregation with a primary key as `GROUP BY`, usually using an ID for multiple tracing events.
196-
* `IDENTIFIED BY col` will calculate the span of the trace, usually you can set `IDENTIFIED BY _tp_time`.
197-
* `MAXSPAN interval` to identify whether the span of the related events over a certain interval, for example `MAXSPAN 500ms` to flag those events with same tracing ID but over 0.5 second span.
198-
* `ONLY`: if you add this keyword, then only those events over the `MAXSPAN` will be emitted, other events less than the `MAXSPAN` will be omitted, so that you can focus on those events over the SLA.
199-
* `AND TIMEOUT interval` to avoid waiting for late events for too long. If there is no more events with the same key (e.g. tracing ID) after this interval, Timeplus will close the session for the key and emit results.
173+
This query checks for changes every second and emits results only when updates occur.
200174

201-
It's required to use `SETTINGS default_hash_table='hybrid'` with this emit policy to avoid putting too much data in memory.
175+
### `EMIT AFTER KEY EXPIRE`
202176

203-
Here is an example to get the log streams and only show the events with over 0.5 second as the end-to-end latency.
204-
```sql
205-
WITH grouped AS(
206-
SELECT
207-
trace_id,
208-
min(start_time) AS start_ts,
209-
max(end_time) AS end_ts,
210-
date_diff('ms', start_ts, end_ts) AS span_ms,
211-
group_array(json_encode(span_id, parent_span_id, name, start_time, end_time, attributes)) AS trace_events
212-
FROM otel_traces
213-
GROUP BY trace_id
214-
EMIT AFTER KEY EXPIRE IDENTIFIED BY end_time WITH MAXSPAN 500ms AND TIMEOUT 2s
215-
)
216-
SELECT json_encode(trace_id, start_ts, end_ts, span_ms, trace_events) AS event FROM grouped
217-
SETTINGS default_hash_table='hybrid', max_hot_keys=1000000, allow_independent_shard_processing=true;
218-
```
177+
Designed for **OpenTelemetry trace analysis** and other similar use cases where you need to track **key lifetimes** across high-cardinality datasets (e.g., trace spans).
219178

220-
### EMIT PER EVENT
221-
This emit policy allows you to emit results for every event in the stream, which can be useful for debugging or monitoring purposes.
179+
This policy emits aggregation results once a key is considered **expired**.
222180

223-
For example, if you create a random stream `market_data` and run:
224-
```sql
225-
select count() from market_data
226-
```
227-
You will get the count of all events in the stream, every 2 seconds by default. Such as 10, 20, 30, etc.
181+
**Syntax**:
228182

229-
If you want to emit results for every event, you can use:
230183
```sql
231-
select count() from market_data emit per event
184+
EMIT AFTER KEY EXPIRE [IDENTIFIED BY <col>] WITH [ONLY] MAXSPAN <interval> [AND TIMEOUT <interval>]
232185
```
233-
You will get the count of all events in the stream, every time a new event is added to the stream. Such as 1, 2, 3, 4, 5, etc.
234186

235-
This new emit policy is useful for specific use cases where you want to see the results of your query for every event in the stream. It can be particularly useful for debugging or monitoring purposes, as it allows you to see the results of your query in real-time as new events are added to the stream.
187+
**Parameters**:
188+
* `EMIT AFTER KEY EXPIRE` - enables per-key lifetime tracking.
189+
* `IDENTIFIED BY <col>` - column used to compute span duration (defaults to **_tp_time** if omitted).
190+
* `MAXSPAN <interval>` - maximum allowed span before emission.
191+
* `ONLY` - emit results only if span exceeds MAXSPAN.
192+
* `TIMEOUT <interval>` - forces emission after inactivity to avoid waiting indefinitely.
236193

237-
For high throughput streams, you may want to use this emit policy with caution, as it can generate a lot of output and may impact the performance of your query.
194+
:::info
195+
Currently must be used with `SETTINGS default_hash_table='hybrid'` to prevent excessive memory usage.
196+
:::
238197

239-
There are some limitations for this emit policy:
198+
**Example**:
240199

241-
It does not support parallel processing, so it may not be suitable for high throughput streams. If there are multiple partitions for the Kafka external stream or multiple shards for the Timeplus stream, this emit policy will not work.
242-
243-
One workaround is to use `SHUFFLE BY` to shuffle the data into one partition or shard, but this may impact the performance of your query. For example, you can use:
244200
```sql
245-
select type, count() from github_events shuffle by type group by type emit per event;
201+
WITH grouped AS
202+
(
203+
SELECT
204+
trace_id,
205+
min(start_time) AS start_ts,
206+
max(end_time) AS end_ts,
207+
date_diff('ms', start_ts, end_ts) AS span_ms,
208+
group_array(json_encode(span_id, parent_span_id, name, start_time, end_time, attributes)) AS trace_events
209+
FROM otel_traces
210+
SHUFFLE BY trace_id
211+
GROUP BY trace_id
212+
EMIT AFTER KEY EXPIRE IDENTIFIED BY end_time WITH ONLY MAXSPAN 500ms AND TIMEOUT 2s
213+
)
214+
SELECT json_encode(trace_id, start_ts, end_ts, span_ms, trace_events) AS event
215+
FROM grouped
216+
SETTINGS
217+
default_hash_table='hybrid',
218+
max_hot_keys=1000000;
246219
```
247220

248-
The other possible workaround if the stream's sharding expression is based on id, for example:
249-
```sql
250-
create stream multi_shards_stream(id int, ...) settings shards=3, sharding_expr='weak_hash32(id)';
251-
```
252-
In this case, you can set `allow_independent_shard_processing=true` to process in parallel.
221+
**Explanation**:
253222

254-
```sql
255-
SELECT id, count() FROM multi_shards_stream GROUP BY id EMIT PER EVENT
256-
SETTINGS allow_independent_shard_processing=true;
257-
```
223+
- Tracks `trace_id` events with start/end times.
224+
- Emits results when:
225+
- The span exceeds `MAXSPAN` (500 ms), or
226+
- No new events arrive for `TIMEOUT` (2 s).
227+
- The `ONLY` modifier ensures only traces exceeding the span threshold (500ms) are emitted.
228+
- Expired keys are garbage-collected after emission.
258229

259-
The other limitation is that it does not support substream processing. For example, the following query will not work:
230+
### `EMIT PER EVENT`
231+
232+
Emits results for **every incoming event**.
233+
This policy is mainly for debugging or low-volume streams, as it can produce very high output.
234+
235+
**Example**:
260236
```sql
261-
SELECT id, count() FROM single_shard_stream partition by id EMIT PER EVENT
237+
SELECT count()
238+
FROM market_data
239+
EMIT PER EVENT;
262240
```
241+
Each new event triggers an immediate emission of the updated count:
242+
`1, 2, 3, 4, 5, …`
243+
244+
Use this mode carefully in high-throughput environments.

docs/hop-aggregation.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ In this example:
125125

126126
### `EMIT ON UPDATE`
127127

128-
Emits **intermediate aggregation updates** whenever the results change within an open window.
128+
Emits **intermediate aggregation updates** whenever there are new events flowing in an open window.
129129
This is useful for near real-time visibility into evolving metrics.
130130

131131
**Example**:

0 commit comments

Comments
 (0)