Skip to content

Commit 7d3977f

Browse files
authored
refactor window aggregation and emit policies (#487)
* refactor window aggregation and emit policies * refactor window aggregation and emit policies
1 parent e3cd18c commit 7d3977f

File tree

6 files changed

+481
-9
lines changed

6 files changed

+481
-9
lines changed

docs/global-aggregation.md

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
# Global Aggregation
2+
3+
## Overview
4+
5+
In Timeplus, we define global aggregation as an aggregation query without using streaming windows like tumble, hop. Unlike streaming window aggregation, global streaming aggregation doesn't slice
6+
the unbound streaming data into windows according to timestamp, instead it processes the unbounded streaming data as one huge big global window. Due to this property, Timeplus for now can't
7+
recycle in-memory aggregation states / results according to timestamp for global aggregation.
8+
9+
```sql
10+
SELECT <column_name1>, <column_name2>, <aggr_function>
11+
FROM <stream_name>
12+
[WHERE clause]
13+
EMIT PERIODIC [<n><UNIT>]
14+
```
15+
16+
`PERIODIC <n><UNIT>` tells Timeplus to emit the aggregation periodically. `UNIT` can be ms(millisecond), s(second), m(minute),h(hour),d(day).`<n>` shall be an integer greater than 0.
17+
18+
Examples
19+
20+
```sql
21+
SELECT device, count(*)
22+
FROM device_utils
23+
WHERE cpu_usage > 99
24+
EMIT PERIODIC 5s
25+
```
26+
27+
Like in [Streaming Tail](/query-syntax#streaming-tailing), Timeplus continuously monitors new events in the stream `device_utils`, does the filtering and then continuously does **incremental** count aggregation. Whenever the specified delay interval is up, project the current aggregation result to clients.
28+
29+
## Emit Policies
30+
31+
### EMIT PERIODIC {#emit_periodic}
32+
33+
`PERIODIC <n><UNIT>` tells Timeplus to emit the aggregation periodically. `UNIT` can be ms(millisecond), s(second), m(minute),h(hour),d(day).`<n>` shall be an integer greater than 0.
34+
35+
Example:
36+
37+
```sql
38+
SELECT device, count(*)
39+
FROM device_utils
40+
WHERE cpu_usage > 99
41+
EMIT PERIODIC 5s
42+
```
43+
44+
For [Global Streaming Aggregation](#global) the default periodic emit interval is `2s`, i.e. 2 seconds.
45+
46+
You can also apply `EMIT PERIODIC` in time windows, such as tumble/hop/session.
47+
48+
When you run a tumble window aggregation, by default Timeplus will emit results when the window is closed. So `tumble(stream,5s)` will emit results every 5 seconds, unless there is no event in the window to progress the watermark.
49+
50+
In some cases, you may want to get aggregation results even the window is not closed, so that you can get timely alerts. For example, the following SQL will run a 5-second tumble window and every 1 second, if the number of event is over 300, a row will be emitted.
51+
52+
```sql
53+
SELECT window_start, count() AS cnt
54+
FROM tumble(car_live_data, 5s)
55+
GROUP BY window_start
56+
HAVING cnt > 300
57+
EMIT PERIODIC 1s
58+
```
59+
60+
### EMIT PERIODIC REPEAT {#emit_periodic_repeat}
61+
62+
Starting from Timeplus Proton 1.6.2, you can optionally add `REPEAT` to the end of `EMIT PERIODIC <n><UNIT>`. For global aggregations, by default every 2 seconds, the aggregation result will be emitted. But if there is no new event since last emit, no result will be emitted. With the `REPEAT` at the end of the emit policy, Timeplus will emit results at the fixed interval, even there is no new events since last emit. For example:
63+
```sql
64+
SELECT count() FROM t
65+
EMIT PERIODIC 3s REPEAT
66+
```
67+
68+
### EMIT TIMEOUT
69+
70+
You can apply `EMIT TIMEOUT` on global aggregation, e.g.
71+
```sql
72+
SELECT count() FROM t EMIT TIMEOUT 1s;
73+
```
74+
75+
It also can be applied to window aggregations and `EMIT AFTER WINDOW CLOSE` is automatically appended, e.g.
76+
```sql
77+
SELECT count() FROM tumble(t,5s) GROUP BY window_start EMIT TIMEOUT 1s;
78+
```
79+
80+
### EMIT ON UPDATE {#emit_on_update}
81+
82+
You can apply `EMIT ON UPDATE` in time windows, such as tumble/hop/session, with `GROUP BY` keys. For example:
83+
84+
```sql
85+
SELECT
86+
window_start, cid, count() AS cnt
87+
FROM
88+
tumble(car_live_data, 5s)
89+
WHERE
90+
cid IN ('c00033', 'c00022')
91+
GROUP BY
92+
window_start, cid
93+
EMIT ON UPDATE
94+
```
95+
96+
During the 5 second tumble window, even the window is not closed, as long as the aggregation value(`cnt`) for the same `cid` is different , the results will be emitted.
97+
98+
### EMIT ON UPDATE WITH BATCH {#emit_on_update_with_batch}
99+
100+
You can combine `EMIT PERIODIC` and `EMIT ON UPDATE` together. In this case, even the window is not closed, Timeplus will check the intermediate aggregation result at the specified interval and emit rows if the result is changed.
101+
```sql
102+
SELECT
103+
window_start, cid, count() AS cnt
104+
FROM
105+
tumble(car_live_data, 5s)
106+
WHERE
107+
cid IN ('c00033', 'c00022')
108+
GROUP BY
109+
window_start, cid
110+
EMIT ON UPDATE WITH BATCH 2s
111+
```
112+
113+
### EMIT AFTER KEY EXPIRE IDENTIFIED BY .. WITH MAXSPAN .. AND TIMEOUT .. {#emit_after_key_expire}
114+
115+
The syntax is:
116+
```sql
117+
EMIT AFTER KEY EXPIRE [IDENTIFIED BY <col>] WITH [ONLY] MAXSPAN <internal> [AND TIMEOUT <internal>]
118+
```
119+
120+
Note:
121+
* `EMIT AFTER KEY EXPIRE` will emit results when the keys are expired. This EMIT policy ought to be applied to a global aggregation with a primary key as `GROUP BY`, usually using an ID for multiple tracing events.
122+
* `IDENTIFIED BY col` will calculate the span of the trace, usually you can set `IDENTIFIED BY _tp_time`.
123+
* `MAXSPAN interval` to identify whether the span of the related events over a certain interval, for example `MAXSPAN 500ms` to flag those events with same tracing ID but over 0.5 second span.
124+
* `ONLY`: if you add this keyword, then only those events over the `MAXSPAN` will be emitted, other events less than the `MAXSPAN` will be omitted, so that you can focus on those events over the SLA.
125+
* `AND TIMEOUT interval` to avoid waiting for late events for too long. If there is no more events with the same key (e.g. tracing ID) after this interval, Timeplus will close the session for the key and emit results.
126+
127+
It's required to use `SETTINGS default_hash_table='hybrid'` with this emit policy to avoid putting too much data in memory.
128+
129+
Here is an example to get the log streams and only show the events with over 0.5 second as the end-to-end latency.
130+
```sql
131+
WITH grouped AS(
132+
SELECT
133+
trace_id,
134+
min(start_time) AS start_ts,
135+
max(end_time) AS end_ts,
136+
date_diff('ms', start_ts, end_ts) AS span_ms,
137+
group_array(json_encode(span_id, parent_span_id, name, start_time, end_time, attributes)) AS trace_events
138+
FROM otel_traces
139+
GROUP BY trace_id
140+
EMIT AFTER KEY EXPIRE IDENTIFIED BY end_time WITH MAXSPAN 500ms AND TIMEOUT 2s
141+
)
142+
SELECT json_encode(trace_id, start_ts, end_ts, span_ms, trace_events) AS event FROM grouped
143+
SETTINGS default_hash_table='hybrid', max_hot_keys=1000000, allow_independent_shard_processing=true;
144+
```
145+
146+
### EMIT PER EVENT
147+
This emit policy allows you to emit results for every event in the stream, which can be useful for debugging or monitoring purposes.
148+
149+
For example, if you create a random stream `market_data` and run:
150+
```sql
151+
select count() from market_data
152+
```
153+
You will get the count of all events in the stream, every 2 seconds by default. Such as 10, 20, 30, etc.
154+
155+
If you want to emit results for every event, you can use:
156+
```sql
157+
select count() from market_data emit per event
158+
```
159+
You will get the count of all events in the stream, every time a new event is added to the stream. Such as 1, 2, 3, 4, 5, etc.
160+
161+
This new emit policy is useful for specific use cases where you want to see the results of your query for every event in the stream. It can be particularly useful for debugging or monitoring purposes, as it allows you to see the results of your query in real-time as new events are added to the stream.
162+
163+
For high throughput streams, you may want to use this emit policy with caution, as it can generate a lot of output and may impact the performance of your query.
164+
165+
There are some limitations for this emit policy:
166+
167+
It does not support parallel processing, so it may not be suitable for high throughput streams. If there are multiple partitions for the Kafka external stream or multiple shards for the Timeplus stream, this emit policy will not work.
168+
169+
One workaround is to use `SHUFFLE BY` to shuffle the data into one partition or shard, but this may impact the performance of your query. For example, you can use:
170+
```sql
171+
select type, count() from github_events shuffle by type group by type emit per event;
172+
```
173+
174+
The other possible workaround if the stream's sharding expression is based on id, for example:
175+
```sql
176+
create stream multi_shards_stream(id int, ...) settings shards=3, sharding_expr='weak_hash32(id)';
177+
```
178+
In this case, you can set `allow_independent_shard_processing=true` to process in parallel.
179+
180+
```sql
181+
SELECT id, count() FROM multi_shards_stream GROUP BY id EMIT PER EVENT
182+
SETTINGS allow_independent_shard_processing=true;
183+
```
184+
185+
The other limitation is that it does not support substream processing. For example, the following query will not work:
186+
```sql
187+
SELECT id, count() FROM single_shard_stream partition by id EMIT PER EVENT
188+
```

docs/hop-aggregation.md

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
# Hop Window Aggregation {#hop_window}
2+
3+
## Overview
4+
5+
Like [Tumble](#tumble), Hop also slices the unbounded streaming data into smaller windows, and it has an additional sliding step.
6+
7+
```sql
8+
SELECT <column_name1>, <column_name2>, <aggr_function>
9+
FROM hop(<stream_name>, [<timestamp_column>], <hop_slide_size>, [hop_windows_size], [<time_zone>])
10+
[WHERE clause]
11+
GROUP BY [<window_start | window_end>], ...
12+
EMIT <window_emit_policy>
13+
SETTINGS <key1>=<value1>, <key2>=<value2>, ...
14+
```
15+
16+
Hop window is a more generalized window compared to tumble window. Hop window has an additional
17+
parameter called `<hop_slide_size>` which means window progresses this slide size every time. There are 3 cases:
18+
19+
1. `<hop_slide_size>` is less than `<hop_window_size>`. Hop windows have overlaps meaning an event can fall into several hop windows.
20+
2. `<hop_slide_size>` is equal to `<hop_window_size>`. Degenerated to a tumble window.
21+
3. `<hop_slide_size>` is greater than `<hop_window_size>`. Windows has a gap in between. Usually not useful, hence not supported so far.
22+
23+
Please note, at this point, you need to use the same time unit in `<hop_slide_size>` and `<hop_window_size>`, for example `hop(device_utils, 1s, 60s)` instead of `hop(device_utils, 1s, 1m)`.
24+
25+
Here is one hop window example which has 2 seconds slide and 5 seconds hop window.
26+
27+
```
28+
["2020-01-01 00:00:00", "2020-01-01 00:00:05]
29+
["2020-01-01 00:00:02", "2020-01-01 00:00:07]
30+
["2020-01-01 00:00:04", "2020-01-01 00:00:09]
31+
["2020-01-01 00:00:06", "2020-01-01 00:00:11]
32+
...
33+
```
34+
35+
Except that the hop window can have overlaps, other semantics are identical to the tumble window.
36+
37+
```sql
38+
SELECT device, max(cpu_usage)
39+
FROM hop(device_utils, 2s, 5s)
40+
GROUP BY device, window_end
41+
EMIT AFTER WINDOW CLOSE;
42+
```
43+
44+
The above example SQL continuously aggregates max cpu usage per device per hop window for stream `device_utils`. Every time a window is closed, Timeplus emits the aggregation results.
45+
46+
## Emit Policies
47+
48+
### EMIT AFTER WINDOW CLOSE {#emit_after}
49+
50+
You can omit `EMIT AFTER WINDOW CLOSE`, since this is the default behavior for time window aggregations. For example:
51+
52+
```sql
53+
SELECT device, max(cpu_usage)
54+
FROM tumble(device_utils, 5s)
55+
GROUP BY device, window_end
56+
```
57+
58+
The above example SQL continuously aggregates max cpu usage per device per tumble window for the stream `devices_utils`. Every time a window is closed, Timeplus Proton emits the aggregation results. How to determine the window should be closed? This is done by [Watermark](/understanding-watermark), which is an internal timestamp. It is guaranteed to be increased monotonically per stream query.
59+
60+
### EMIT AFTER WINDOW CLOSE WITH DELAY {#emit_after_with_delay}
61+
62+
Example:
63+
64+
```sql
65+
SELECT device, max(cpu_usage)
66+
FROM tumble(device_utils, 5s)
67+
GROUP BY device, widnow_end
68+
EMIT AFTER WINDOW CLOSE WITH DELAY 2s;
69+
```
70+
71+
The above example SQL continuously aggregates max cpu usage per device per tumble window for the stream `device_utils`. Every time a window is closed, Timeplus Proton waits for another 2 seconds and then emits the aggregation results.
72+
73+
### EMIT ON UPDATE {#emit_on_update}
74+
75+
You can apply `EMIT ON UPDATE` in time windows, such as tumble/hop/session, with `GROUP BY` keys. For example:
76+
77+
```sql
78+
SELECT
79+
window_start, cid, count() AS cnt
80+
FROM
81+
tumble(car_live_data, 5s)
82+
WHERE
83+
cid IN ('c00033', 'c00022')
84+
GROUP BY
85+
window_start, cid
86+
EMIT ON UPDATE
87+
```
88+
89+
During the 5 second tumble window, even the window is not closed, as long as the aggregation value(`cnt`) for the same `cid` is different , the results will be emitted.
90+
91+
### EMIT ON UPDATE WITH DELAY {#emit_on_update_with_delay}
92+
93+
Adding the `WITH DELAY` to `EMIT ON UPDATE` will allow late event for the window aggregation.
94+
95+
```sql
96+
SELECT
97+
window_start, cid, count() AS cnt
98+
FROM
99+
tumble(car_live_data, 5s)
100+
WHERE
101+
cid IN ('c00033', 'c00022')
102+
GROUP BY
103+
window_start, cid
104+
EMIT ON UPDATE WITH DELAY 2s
105+
```
106+
107+
### EMIT ON UPDATE WITH BATCH {#emit_on_update_with_batch}
108+
109+
You can combine `EMIT PERIODIC` and `EMIT ON UPDATE` together. In this case, even the window is not closed, Timeplus will check the intermediate aggregation result at the specified interval and emit rows if the result is changed.
110+
```sql
111+
SELECT
112+
window_start, cid, count() AS cnt
113+
FROM
114+
tumble(car_live_data, 5s)
115+
WHERE
116+
cid IN ('c00033', 'c00022')
117+
GROUP BY
118+
window_start, cid
119+
EMIT ON UPDATE WITH BATCH 2s
120+
```

docs/jit.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Just-In-Time (JIT) compilation
1+
# Just-In-Time (JIT) Compilation
22

33
Starting from Timeplus Enterprise 2.9, the JIT compilation is enabled by default. For example, if you need to run the following SQL multiple times:
44
```sql

docs/session-aggregation.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# Session Window Aggregation {#session_window}
2+
3+
This is similar to tumble and hop window. Please check the [session](/functions_for_streaming#session) function.

0 commit comments

Comments
 (0)