Skip to content

Commit 5c396a0

Browse files
authored
Issue 514 refine hop window (#515)
1 parent dcfa0e3 commit 5c396a0

File tree

2 files changed

+135
-80
lines changed

2 files changed

+135
-80
lines changed

docs/hop-aggregation.md

Lines changed: 122 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,120 +1,176 @@
1-
# Hop Window Aggregation {#hop_window}
1+
# Hop Window Aggregation
22

33
## Overview
44

5-
Like [Tumble](#tumble), Hop also slices the unbounded streaming data into smaller windows, and it has an additional sliding step.
5+
A **hop window** (also known as a **sliding window**) is a type of time-based window that allows data to be grouped into **overlapping segments**.
6+
7+
Each hop window is defined by two parameters:
8+
- **Window size** – the total duration of each window.
9+
- **Hop interval** – how often a new window starts.
10+
11+
Because hop windows can overlap, a single event can belong to multiple windows. This is useful when you want to generate **smoother, more continuous aggregations** (e.g., moving averages or rolling counts).
12+
13+
For example, with a 10-minute window size and a 5-minute hop interval, a new window starts every 5 minutes and spans 10 minutes of data — meaning there are always **two overlapping windows** active at any given time.
14+
15+
16+
## Syntax
617

718
```sql
8-
SELECT <column_name1>, <column_name2>, <aggr_function>
9-
FROM hop(<stream_name>, [<timestamp_column>], <hop_slide_size>, [hop_windows_size], [<time_zone>])
19+
SELECT <grouping-keys>, <aggr-functions>
20+
FROM hop(<stream-name>, [<timestamp-column>], <hop-interval>, <window-size>)
1021
[WHERE clause]
11-
GROUP BY [<window_start | window_end>], ...
12-
EMIT <window_emit_policy>
13-
SETTINGS <key1>=<value1>, <key2>=<value2>, ...
22+
GROUP BY [<window_start | window_end>], <other-group-keys> ...
23+
EMIT <emit-policy>
1424
```
1525

16-
Hop window is a more generalized window compared to tumble window. Hop window has an additional
17-
parameter called `<hop_slide_size>` which means window progresses this slide size every time. There are 3 cases:
26+
### Parameters
1827

19-
1. `<hop_slide_size>` is less than `<hop_window_size>`. Hop windows have overlaps meaning an event can fall into several hop windows.
20-
2. `<hop_slide_size>` is equal to `<hop_window_size>`. Degenerated to a tumble window.
21-
3. `<hop_slide_size>` is greater than `<hop_window_size>`. Windows has a gap in between. Usually not useful, hence not supported so far.
28+
- `<stream-name>` : the source stream the hop window applies to. **Required**
29+
- `<timestamp-column>` : the event timestamp column which is used to calculate window starts / ends and internal watermark. You can use `now()` or `now64(3)` to enable processing time hop window. Default is `_tp_time` if absent. **Optional**
30+
- `<hop-interval>` : how frequently new windows start (must be less than or equal to the window size). Supported interval units are listed below. **Required**
31+
- `s` : second
32+
- `m` : miniute
33+
- `h` : hour
34+
- `d` : day
35+
- `w` : week
36+
- `<window-size>` : hop window interval size. Supported interval units are listed below. **Required**
37+
- `s` : second
38+
- `m` : miniute
39+
- `h` : hour
40+
- `d` : day
41+
- `w` : week
2242

23-
Please note, at this point, you need to use the same time unit in `<hop_slide_size>` and `<hop_window_size>`, for example `hop(device_utils, 1s, 60s)` instead of `hop(device_utils, 1s, 1m)`.
43+
### Example
2444

25-
Here is one hop window example which has 2 seconds slide and 5 seconds hop window.
45+
The following query calculates the average CPU usage of each device in **10-second hop windows** that start every **4 second**.
2646

47+
```sql
48+
CREATE STREAM device_metrics (
49+
device string,
50+
cpu_usage float,
51+
event_time datetime64(3)
52+
);
53+
54+
SELECT
55+
window_start,
56+
window_end,
57+
device,
58+
max(cpu_usage)
59+
FROM hop(device_metrics, event_time, 4s, 10s)
60+
GROUP BY window_start, window_end, device;
2761
```
28-
["2020-01-01 00:00:00", "2020-01-01 00:00:05]
29-
["2020-01-01 00:00:02", "2020-01-01 00:00:07]
30-
["2020-01-01 00:00:04", "2020-01-01 00:00:09]
31-
["2020-01-01 00:00:06", "2020-01-01 00:00:11]
32-
...
33-
```
3462

35-
Except that the hop window can have overlaps, other semantics are identical to the tumble window.
63+
This allows you to track metrics like CPU usage in a rolling fashion, providing near-real-time insight into recent activity rather than discrete, non-overlapping periods.
64+
65+
## Emit Policies
66+
67+
Emit policies define **when** Timeplus should output results from **time-windowed** aggregations such as **tumble**, **hop**, **session** and **global-windowed** aggregation.
68+
69+
These policies control whether results are emitted **only after the window closes**, **after a delay** to honor late events, or **incrementally during the window**.
70+
71+
### `EMIT AFTER WINDOW CLOSE`
72+
73+
This is the **default behavior** for all time window aggregations. Timeplus emits the aggregated results once the window closes.
74+
75+
**Example**:
3676

3777
```sql
38-
SELECT device, max(cpu_usage)
39-
FROM hop(device_utils, 2s, 5s)
40-
GROUP BY device, window_end
41-
EMIT AFTER WINDOW CLOSE;
78+
SELECT window_start, device, max(cpu_usage)
79+
FROM hop(device_metrics, 4s, 10s)
80+
GROUP BY window_start, device;
4281
```
4382

44-
The above example SQL continuously aggregates max cpu usage per device per hop window for stream `device_utils`. Every time a window is closed, Timeplus emits the aggregation results.
45-
46-
## Emit Policies
83+
This query continuously computes the **maximum CPU usage** per device in every 10-second hop window from the stream `device_metrics` and every 4 second, it starts a new hop window.
84+
Each time a window closes (as determined by the internal watermark), Timeplus emits the results once.
85+
86+
:::info
87+
A watermark is an internal timestamp that advances monotonically per stream, determining when a window can be safely closed.
88+
:::
4789

48-
### EMIT AFTER WINDOW CLOSE {#emit_after}
90+
### `EMIT AFTER WINDOW CLOSE WITH DELAY`
4991

50-
You can omit `EMIT AFTER WINDOW CLOSE`, since this is the default behavior for time window aggregations. For example:
92+
Adds a **delay period** before emitting window results, allowing **late events** to be included.
93+
94+
**Example**:
5195

5296
```sql
53-
SELECT device, max(cpu_usage)
54-
FROM tumble(device_utils, 5s)
55-
GROUP BY device, window_end
97+
SELECT window_start, device, max(cpu_usage)
98+
FROM hop(device_metrics, 4s, 10s)
99+
GROUP BY window_start, device
100+
EMIT AFTER WINDOW CLOSE WITH DELAY 2s;
56101
```
57102

58-
The above example SQL continuously aggregates max cpu usage per device per tumble window for the stream `devices_utils`. Every time a window is closed, Timeplus Proton emits the aggregation results. How to determine the window should be closed? This is done by [Watermark](/understanding-watermark), which is an internal timestamp. It is guaranteed to be increased monotonically per stream query.
103+
This query aggregates the **maximum CPU usage** for each device per 10-second hop window, then waits 2 additional seconds after the window closes before emitting results. This helps capture any late-arriving events that fall within the window period.
104+
105+
### `EMIT TIMEOUT`
59106

60-
### EMIT AFTER WINDOW CLOSE WITH DELAY {#emit_after_with_delay}
107+
In some streaming scenarios, the **last hop windows** might remain open because no new events arrive to advance the watermark (i.e., the event time progress).
108+
The **`EMIT TIMEOUT`** clause helps forcefully close such idle windows after a specified period of inactivity.
61109

62-
Example:
110+
**Example**:
63111

64112
```sql
65-
SELECT device, max(cpu_usage)
66-
FROM tumble(device_utils, 5s)
67-
GROUP BY device, widnow_end
68-
EMIT AFTER WINDOW CLOSE WITH DELAY 2s;
113+
SELECT window_start, device, max(cpu_usage)
114+
FROM hop(device_metrics, 4s, 10s)
115+
GROUP BY window_start, device
116+
EMIT TIMEOUT 3s;
69117
```
70118

71-
The above example SQL continuously aggregates max cpu usage per device per tumble window for the stream `device_utils`. Every time a window is closed, Timeplus Proton waits for another 2 seconds and then emits the aggregation results.
119+
In this example:
120+
121+
- The query continuously aggregates the maximum CPU usage per device in **10-second hop windows**.
122+
- If **no new events** arrive for **3 seconds**, Timeplus will **force-close** the most recent windows and emit the final results.
123+
- Once the window is closed, the **internal watermark** (event time) advances as well.
124+
- Any **late events** that belong to this now-closed window will be discarded.
125+
126+
### `EMIT ON UPDATE`
72127

73-
### EMIT ON UPDATE {#emit_on_update}
128+
Emits **intermediate aggregation updates** whenever the results change within an open window.
129+
This is useful for near real-time visibility into evolving metrics.
74130

75-
You can apply `EMIT ON UPDATE` in time windows, such as tumble/hop/session, with `GROUP BY` keys. For example:
131+
**Example**:
76132

77133
```sql
78134
SELECT
79-
window_start, cid, count() AS cnt
135+
window_start, device, max(cpu_usage)
80136
FROM
81-
tumble(car_live_data, 5s)
82-
WHERE
83-
cid IN ('c00033', 'c00022')
137+
hop(device_devices, 4s, 10s)
84138
GROUP BY
85-
window_start, cid
86-
EMIT ON UPDATE
139+
window_start, device
140+
EMIT ON UPDATE;
87141
```
88142

89-
During the 5 second tumble window, even the window is not closed, as long as the aggregation value(`cnt`) for the same `cid` is different , the results will be emitted.
143+
Here, during each 10-second window, the system emits updates whenever there are new events flowing into the open window, even before the window closes.
90144

91-
### EMIT ON UPDATE WITH DELAY {#emit_on_update_with_delay}
145+
### `EMIT ON UPDATE WITH BATCH`
92146

93-
Adding the `WITH DELAY` to `EMIT ON UPDATE` will allow late event for the window aggregation.
147+
Combines **periodic emission** with **update-based** triggers.
148+
Timeplus checks the intermediate aggregation results at regular intervals and emits them if they have changed which can significally improve the emit efficiency and throughput compared with `EMIT ON UPDATE`.
149+
150+
**Example**:
94151

95152
```sql
96153
SELECT
97-
window_start, cid, count() AS cnt
154+
window_start, device, max(cpu_usage)
98155
FROM
99-
tumble(car_live_data, 5s)
100-
WHERE
101-
cid IN ('c00033', 'c00022')
156+
hop(device_metrics, 4s, 10s)
102157
GROUP BY
103-
window_start, cid
104-
EMIT ON UPDATE WITH DELAY 2s
158+
window_start, device
159+
EMIT ON UPDATE WITH BATCH 1s;
105160
```
106161

107-
### EMIT ON UPDATE WITH BATCH {#emit_on_update_with_batch}
162+
### `EMIT ON UPDATE WITH DELAY`
163+
164+
Similar to **`EMIT ON UPDATE`**, but includes a delay to allow late events before emitting incremental updates.
165+
166+
**Example**:
108167

109-
You can combine `EMIT PERIODIC` and `EMIT ON UPDATE` together. In this case, even the window is not closed, Timeplus will check the intermediate aggregation result at the specified interval and emit rows if the result is changed.
110168
```sql
111169
SELECT
112-
window_start, cid, count() AS cnt
170+
window_start, device, max(cpu_usage)
113171
FROM
114-
tumble(car_live_data, 5s)
115-
WHERE
116-
cid IN ('c00033', 'c00022')
172+
hop(device_metrics, 4s, 10s)
117173
GROUP BY
118-
window_start, cid
119-
EMIT ON UPDATE WITH BATCH 2s
174+
window_start, device
175+
EMIT ON UPDATE WITH DELAY 2s;
120176
```

docs/tumble-aggregation.md

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,16 @@ This makes them simple, deterministic, and ideal for producing periodic reports
1212
## Syntax
1313

1414
```sql
15-
SELECT <grouping-keys>, <aggr_functions>
15+
SELECT <grouping-keys>, <aggr-functions>
1616
FROM tumble(<stream-name>, [<timestamp-column>], <window-size>])
1717
[WHERE clause]
1818
GROUP BY [window_start | window_end], <other-group-keys> ...
1919
EMIT <emit-policy>
20-
SETTINGS <key1>=<value1>, <key2>=<value2>, ...
2120
```
2221

2322
### Parameters
2423

25-
- `<stream-name>` : the source stream the tumble window applied to. **Required**
24+
- `<stream-name>` : the source stream the tumble window applies to. **Required**
2625
- `<timestamp-column>` : the event timestamp column which is used to calculate window starts / ends and internal watermark. You can use `now()` or `now64(3)` to enable processing time tumble window. Default is `_tp_time` if absent. **Optional**
2726
- `<window-size>` : tumble window interval size. Supported interval units are listed below. **Required**
2827
- `s` : second
@@ -35,26 +34,26 @@ SETTINGS <key1>=<value1>, <key2>=<value2>, ...
3534

3635
```
3736
CREATE STREAM device_metrics (
38-
device_id string,
37+
device string,
3938
cpu_usage float,
4039
event_time datetime64(3)
4140
);
4241
4342
SELECT
4443
window_start,
4544
window_end,
46-
device_id,
45+
device,
4746
avg(cpu_usage) AS avg_cpu
4847
FROM tumble(device_metrics, event_time, 5s)
4948
GROUP BY
5049
window_start,
51-
device_id
50+
device
5251
EMIT AFTER WINDOW CLOSE;
5352
```
5453

5554
**Explanation**:
5655
- Events are grouped into **5-second, non-overlapping windows** based on their `event_time`.
57-
- Each `device_id`’s events are aggregated independently within each window.
56+
- Each `device`’s events are aggregated independently within each window.
5857
- When a window closes, the system emits one aggregated result per device with the computed `avg_cpu`.
5958

6059
**Example timeline**:
@@ -83,11 +82,11 @@ This is the **default behavior** for all time window aggregations. Timeplus emit
8382

8483
```sql
8584
SELECT window_start, device, max(cpu_usage)
86-
FROM tumble(device_utils, 5s)
85+
FROM tumble(device_metrics, 5s)
8786
GROUP BY window_start, device;
8887
```
8988

90-
This query continuously computes the **maximum CPU usage** per device in every 5-second tumble window from the stream `device_utils`.
89+
This query continuously computes the **maximum CPU usage** per device in every 5-second tumble window from the stream `device_metrics`.
9190
Each time a window closes (as determined by the internal watermark), Timeplus emits the results once.
9291

9392
:::info
@@ -102,7 +101,7 @@ Adds a **delay period** before emitting window results, allowing **late events**
102101

103102
```sql
104103
SELECT window_start, device, max(cpu_usage)
105-
FROM tumble(device_utils, 5s)
104+
FROM tumble(device_metrics, 5s)
106105
GROUP BY window_start, device
107106
EMIT AFTER WINDOW CLOSE WITH DELAY 2s;
108107
```
@@ -118,7 +117,7 @@ The **`EMIT TIMEOUT`** clause helps forcefully close such idle windows after a s
118117

119118
```sql
120119
SELECT window_start, device, max(cpu_usage)
121-
FROM tumble(device_utils, 5s)
120+
FROM tumble(device_metrics, 5s)
122121
GROUP BY window_start, device
123122
EMIT TIMEOUT 3s;
124123
```
@@ -141,7 +140,7 @@ This is useful for near real-time visibility into evolving metrics.
141140
SELECT
142141
window_start, device, max(cpu_usage)
143142
FROM
144-
tumble(device_utils, 5s)
143+
tumble(device_metrics, 5s)
145144
GROUP BY
146145
window_start, device
147146
EMIT ON UPDATE;
@@ -160,7 +159,7 @@ Timeplus checks the intermediate aggregation results at regular intervals and em
160159
SELECT
161160
window_start, device, max(cpu_usage)
162161
FROM
163-
tumble(device_utils, 5s)
162+
tumble(device_metrics, 5s)
164163
GROUP BY
165164
window_start, device
166165
EMIT ON UPDATE WITH BATCH 1s;
@@ -176,7 +175,7 @@ Similar to **`EMIT ON UPDATE`**, but includes a delay to allow late events befor
176175
SELECT
177176
window_start, device, max(cpu_usage)
178177
FROM
179-
tumble(device_utils, 5s)
178+
tumble(device_metrics, 5s)
180179
GROUP BY
181180
window_start, device
182181
EMIT ON UPDATE WITH DELAY 2s;

0 commit comments

Comments
 (0)