Skip to content

Commit 1d87100

Browse files
authored
Refine alerts and tasks (#507)
1 parent 7ad28ae commit 1d87100

File tree

3 files changed

+132
-59
lines changed

3 files changed

+132
-59
lines changed

docs/alert.md

Lines changed: 106 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,68 +1,136 @@
11
# Alert
22

3-
Timeplus alerts enable you to monitor your streaming data and automatically trigger actions when specific conditions are met. When your streaming queries detect events of interest, alerts can notify stakeholders via email or Slack, send data to downstream systems like Apache Kafka, or execute custom Python functions for automated responses.
3+
## Overview
44

5-
## Create New Alert
5+
A **Timeplus Alert** continuously monitors events in a source stream using a **streaming query**. When the specified conditions are met, it triggers a [Python UDF](/py-udf) to **interact with external systems** — for example, sending notifications to **Slack**, **Email**, or other services.
6+
7+
Alerts are often used in combination with [Scheduled Tasks](/task) and [Materialized Views](/materialized-view) to form a **complete, automated data pipeline**, as illustrated below:
8+
9+
![AlertPipeline](/img/alert-pipeline.png)
10+
11+
## Create Alert
612

7-
### Syntax
813
```sql
9-
CREATE [OR REPLACE] ALERT [IF NOT EXISTS] [database.]alert_name
10-
BATCH <N> EVENTS WITH TIMEOUT <nUnit>
11-
LIMIT <N> ALERTS PER <nUnit>
12-
CALL <python_udf_name>
13-
AS <select_query>;
14+
CREATE ALERT [IF NOT EXISTS] <db.alert-name>
15+
BATCH <N> EVENTS WITH TIMEOUT <interval>
16+
LIMIT <M> ALERTS PER <interval>
17+
CALL <python-udf-name>
18+
AS <streaming-select-query>;
1419
```
1520

16-
For example:
21+
:::info
22+
The **streaming select query** in an **Alert** must be a **simple** `SELECT` statement that consumes data directly from a stream.
23+
Stateful operations such as **joins** or **aggregations** are **not supported** within Alerts.
24+
25+
If your use case requires joins, aggregations, or other complex logic, create a **Materialized View** first to perform those computations and **materialize** the results into a target stream.
26+
Then, the Alert can consume that **pre-computed stream** to trigger external actions.
27+
:::
28+
29+
### `BATCH N EVENTS WITH TIMEOUT interval`
30+
31+
Defines how events are **batched** before invoking the alert action, improving efficiency and throughput.
32+
Batching can also help with **alert suppression**, reducing redundant or noisy alerts.
33+
34+
The alert is triggered when **either** of the following conditions is met:
35+
- The batch accumulates **`N` events**, or
36+
- The specified **timeout interval** elapses.
37+
38+
**Example:**
39+
1740
```sql
18-
CREATE ALERT default.test
1941
BATCH 10 EVENTS WITH TIMEOUT 5s
20-
LIMIT 1 ALERTS PER 15s
21-
CALL alert_action_proton_new_star
22-
AS SELECT actor FROM github_events WHERE repo='timeplus-io/proton' AND type='WatchEvent'
2342
```
2443

25-
### Limitations
26-
* The alerts only run on the metadata leader node.
27-
* The return value of the Python UDF is ignored.
28-
* The select query cannot include any aggregation or JOIN. You can create a materialized view with complex JOIN or aggregation logic to cache the alert events and `SELECT` the target stream of the materialized view in the alert definition.
29-
* Check `system.stream_state_log` for the alert states or logs.
30-
* The checkpoints of the alerts are available in `system.alert_ckpt_log` stream with the `_tp_sn` column.
44+
In this example, the alert will invoke the configured Python UDF **as soon as**:
45+
46+
- 10 events are collected, or
47+
- 5 seconds pass — whichever happens first.
48+
49+
### `LIMIT M ALERTS PER interval`
50+
51+
Defines **alert suppression** rules to prevent excessive notifications.
52+
This limits the invocation of the configured Python UDF to at most **`M` alerts** within each specified **`interval`**.
3153

32-
### Python UDF
33-
You can import Python libraries and build the custom alert action via [Python UDF](/py-udf). The return value doesn't matter. Here is an example to send events to a specific Slack channel via Slack webhook:
54+
**Example:**
3455

3556
```sql
36-
CREATE OR REPLACE FUNCTION alert_action_proton_new_star(actor string) RETURNS string LANGUAGE PYTHON AS $$
57+
LIMIT 1 ALERTS PER 10s
58+
```
59+
60+
This example restricts the system to trigger **no more than one alert every 10 seconds**,
61+
even if multiple batches or events meet the alert conditions during that period.
62+
63+
### `CALL python-udf-name`
64+
65+
Specifies the **Python UDF** to invoke when events are emitted from the streaming query and both the **batching** and **suppression** conditions are met.
66+
67+
The Python UDF must have a compatible function signature (input parameters) that matches the **projection output** of the streaming query (`SELECT` clause).
68+
69+
:::info
70+
Currently, only **Python UDFs** are supported.
71+
:::
72+
73+
### Checkpoint
74+
75+
Timeplus automatically **checkpoints** the sequence numbers (or offsets) of the source stream.
76+
This ensures that upon recovery, **duplicate alerts are not triggered**.
77+
78+
:::info
79+
Alert checkpoints are stored in the `system.alert_ckpt_log` **Mutable Stream**.
80+
:::
81+
82+
### Example
83+
84+
```sql
85+
CREATE FUNCTION send_star_events_to_slack(actor string)
86+
RETURNS string
87+
LANGUAGE PYTHON AS $$
3788
import json
3889
import requests
39-
def alert_action_proton_new_star(value):
40-
for i in range(len(value)):
41-
github_id=value[i]
42-
requests.post("https://hooks.slack.com/services/T123/B456/other_id", data=json.dumps({"text": f"New 🌟 for Timeplus Proton from https://github.com/{github_id}"}))
90+
91+
def send_star_events_to_slack(value):
92+
for github_id in value:
93+
requests.post(
94+
"https://hooks.slack.com/services/T123/B456/other_id",
95+
data=json.dumps({
96+
"text": f"New 🌟 for Timeplus Proton from https://github.com/{github_id}"
97+
})
98+
)
4399
return value
44100
$$
101+
102+
CREATE ALERT default.watch_event_alert
103+
BATCH 10 EVENTS WITH TIMEOUT 5s
104+
LIMIT 1 ALERTS PER 15s
105+
CALL send_star_events_to_slack
106+
AS
107+
SELECT actor
108+
FROM github_events
109+
WHERE repo = 'timeplus-io/proton' AND type = 'WatchEvent';
45110
```
46-
Please note, similar to regular Python UDF, the input parameter of the Python UDF is an array, instead of a single event. The return value can be anything but you can return the input value, so that you can test the Python UDF via `SELECT udf_name(input)`.
111+
112+
**Explanation**:
113+
114+
- The alert continuously monitors the `github_events` stream in the background.
115+
- When a new `WatchEvent` occurs for the GitHub repository `timeplus-io/proton`, the Python UDF `send_star_events_to_slack` is triggered.
116+
- The UDF posts a formatted message to a Slack webhook, notifying the team of new stars.
117+
- To avoid alert flooding, the alert is **rate-limited** to **1 alert every 15 seconds**.
118+
- The alert also **batches events** for efficiency — the UDF is invoked when either **10 events** accumulate or **5 seconds** have passed, whichever comes first.
47119

48120
## List Alerts
121+
49122
```sql
50-
SHOW ALERTS [FROM database_name] [SETTINGS verbose=true]
123+
SHOW ALERTS [FROM db] [SETTINGS verbose=true]
51124
```
52125

53-
Without `SETTINGS verbose=true`, it lists the alert name and its UUID. With `SETTINGS verbose=true`, the following columns are added:
54-
* version
55-
* last_modified
56-
* last_modified_by
57-
* created
58-
* created_by
126+
## Show Alert
59127

60-
## Show Alert Definition
61128
```sql
62-
SHOW CREATE ALERT [database.]alert_name [SETTINGS show_multi_versions=true]
129+
SHOW CREATE ALERT <db.alert-name> [SETTINGS show_multi_versions=true]
63130
```
64131

65-
## Drop Alerts
132+
## Drop Alert
133+
66134
```sql
67-
DROP ALERT [database.]alert_name
135+
DROP ALERT <db.alert-name>
68136
```

docs/task.md

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1-
# Overview
1+
# Task
22

3-
A **Timeplus scheduled task** runs a historical query periodically according to its schedule and persists the query results to a target Timeplus native stream or external system (e.g., ClickHouse). When combined with Python UDFs, scheduled tasks can move data between external systems and Timeplus, or between different external systems.
3+
## Overview
44

5-
Scheduled tasks complement **Timeplus Materialized Views** which run streaming queries and continuously materialize results to target streams or external systems.
5+
A **Timeplus Scheduled Task** runs a historical query periodically according to its schedule and persists the query results to a target Timeplus native stream or external system (e.g., ClickHouse). When combined with Python UDFs, scheduled tasks can move data between external systems and Timeplus, or between different external systems.
6+
7+
Scheduled tasks complement **Timeplus Materialized Views** which run streaming queries and continuously materialize results to target streams or external systems. Tasks are often used in combination with [Alerts](/alert) and [Materialized Views](/materialized-view) to form a **complete, automated data pipeline**, as illustrated below:
8+
9+
![AlertPipeline](/img/alert-pipeline.png)
610

711
## Create Task
812

@@ -15,15 +19,18 @@ AS
1519
<Historical SELECT query>;
1620
```
1721

18-
**SCHEDULE interval** : The interval at which the task runs.
22+
### `SCHEDULE interval`
23+
24+
The interval at which the task runs.
1925
Tasks are scheduled via a centralized scheduler to prevent overlap: the next run starts only after the previous run completes.
2026

21-
**TIMEOUT interval** : The maximum allowed execution time for the task.
22-
If the task exceeds this interval, the scheduler aborts it to prevent indefinite execution
27+
### `TIMEOUT interval`
28+
29+
The maximum allowed execution time for the task. If the task exceeds this interval, the scheduler aborts it to prevent indefinite execution
2330

2431
Once created, a task is automatically scheduled in the Timeplus cluster. The scheduler selects the best candidate node in the cluster to execute the task.
2532

26-
**Example**:
33+
### Example
2734

2835
To periodically collect the Timeplus node statuses and persist the results into a target stream:
2936

@@ -40,38 +47,36 @@ AS
4047
SELECT cluster_id, node_id, node_state FROM system.cluster;
4148
```
4249

43-
The *node_states* stream will be populated every 5 seconds with the current status of cluster nodes.
50+
The `node_states` stream will be populated every 5 seconds with the current status of cluster nodes.
4451

45-
## Show Task
52+
## List Tasks
4653

4754
```sql
4855
-- List all tasks
4956
SHOW TASKS;
57+
```
58+
59+
## Show Task
5060

61+
```sql
5162
-- Show DDL definition of a task
5263
SHOW CREATE TASK <db.task-name>;
5364
```
5465

55-
## Pause Task
56-
57-
To pause a task:
66+
## Drop Task
5867

5968
```sql
60-
SYSTEM PAUSE TASK <db.task-name>;
69+
DROP TASK <db.task-name>;
6170
```
6271

63-
## Resume Task
64-
65-
To resume a paused task:
72+
## Pause Task
6673

6774
```sql
68-
SYSTEM RESUME TASK <db.task-name>;
75+
SYSTEM PAUSE TASK <db.task-name>;
6976
```
7077

71-
## Delete Task
72-
73-
To delete a task:
78+
## Resume Task
7479

7580
```sql
76-
DELETE TASK <db.task-name>;
81+
SYSTEM RESUME TASK <db.task-name>;
7782
```

static/img/alert-pipeline.png

93.4 KB
Loading

0 commit comments

Comments
 (0)