Skip to content

Commit 39938ea

Browse files
authored
Merge pull request #14 from wp-labs/feature/postgres
Add the configuration description document for the postgres sink
2 parents 45e98f5 + 29248d7 commit 39938ea

4 files changed

Lines changed: 138 additions & 0 deletions

File tree

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# PostgreSQL Sink
2+
3+
The PostgreSQL sink writes parsed records into PostgreSQL tables. It builds `INSERT IGNORE` statements from `columns`, which is suitable for idempotent retry scenarios. It only accepts Record data (raw input is not supported).
4+
5+
## Connector Definition
6+
7+
Use the built-in template (located at `connectors/sink.d/50-mysql.toml`; PostgreSQL uses the same configuration set as MySQL):
8+
9+
```toml
10+
[[connectors]]
11+
id = "postgresql_sink"
12+
type = "postgresql"
13+
allow_override = ["endpoint", "username", "password", "database", "table", "columns", "batch_size"]
14+
15+
[connectors.params]
16+
endpoint = "localhost:5432"
17+
username = "postgres"
18+
password = "123456"
19+
database = "wparse"
20+
table = "nginx_logs"
21+
columns = ["sip", "timestamp", "http/request", "status", "size", "referer", "http/agent", "wp_event_id"]
22+
batch_size = 1024
23+
```
24+
25+
## Parameters
26+
27+
| Parameter | Type | Description |
28+
|------|------|------|
29+
| `endpoint` | string | PostgreSQL endpoint (`host:port`, required) |
30+
| `username` | string | Username (optional, default `postgres`) |
31+
| `password` | string | Password (optional) |
32+
| `database` | string | Target database (required) |
33+
| `table` | string | Target table name (required) |
34+
| `columns` | array | Column list that defines write order (required) |
35+
| `batch_size` | int | Batch insert size (optional) |
36+
37+
## Configuration Example
38+
39+
### Basic Usage
40+
41+
```toml
42+
version = "2.0"
43+
44+
[sink_group]
45+
name = "all"
46+
oml = ["/*"]
47+
batch_timeout_ms=5000 # Auto flush when batch size is not reached within this time window
48+
parallel = 8
49+
50+
[[sink_group.sinks]]
51+
name = "main"
52+
connect = "postgresql_sink"
53+
54+
[sink_group.sinks.params]
55+
endpoint = "localhost:5432"
56+
username = "postgres"
57+
password = "123456"
58+
database = "wparse"
59+
table = "nginx_logs"
60+
columns = ["sip", "timestamp", "http/request", "status", "size", "referer", "http/agent", "wp_event_id"]
61+
batch_size = 1024
62+
```
63+
64+
## Notes
65+
66+
- Field names in `columns` must match OML output fields. Missing table columns are written as `NULL`.
67+
- You can override the connection string with `POSTGRESQL_URL` (format: `postgresql://user:pass@host:port/db`).
68+
- PostgreSQL uses the same configuration keys as MySQL. Only connector `type`, port, and connection details differ.

docs-en/10-user/05-connectors/02-sinks/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ This guide introduces how to configure and use various data outputs (Sinks) in t
1111
| `syslog` | Output to Syslog server (UDP/TCP) | [Syslog Sink Configuration](./13-syslog_sink.md) |
1212
| `tcp` | Output to TCP server | [TCP Sink Configuration](./15-tcp_sink.md) |
1313
| `kafka` | Output to Kafka | - |
14+
| `postgresql` | Output to PostgreSQL | [PostgreSQL Sink Configuration](./23-postgresql_sink.md) |
1415
| `doris` | Output to Doris | [Doris Sink Configuration](./17-doris.md) |
1516
| `elasticsearch` | Output to Elasticsearch | [Elasticsearch Sink Configuration](./20-elasticsearch_sink.md) |
1617
| `prometheus` | Prometheus metrics exposure | [Prometheus Sink Configuration](./14-prometheus_sink.md) |
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# PostgreSQL Sink
2+
3+
PostgreSQL sink 用于将解析后的记录写入 PostgreSQL 表。它会根据 `columns` 生成 `INSERT IGNORE` 语句,适合幂等重试场景;仅接受 Record 数据(不支持 raw 输入)。
4+
5+
## 连接器定义
6+
7+
推荐使用仓库自带模板(位于 `connectors/sink.d/50-mysql.toml`,配置项与 MySQL 保持一致):
8+
9+
```toml
10+
[[connectors]]
11+
id = "postgresql_sink"
12+
type = "postgresql"
13+
allow_override = ["endpoint", "username", "password", "database", "table", "columns", "batch_size"]
14+
15+
[connectors.params]
16+
endpoint = "localhost:5432"
17+
username = "postgres"
18+
password = "123456"
19+
database = "wparse"
20+
table = "nginx_logs"
21+
columns = ["sip", "timestamp", "http/request", "status", "size", "referer", "http/agent", "wp_event_id"]
22+
batch_size = 1024
23+
```
24+
25+
## 可用参数
26+
27+
| 参数 | 类型 | 说明 |
28+
|------|------|------|
29+
| `endpoint` | string | PostgreSQL 地址(`host:port`,必填) |
30+
| `username` | string | 用户名(可选,默认 `postgres`|
31+
| `password` | string | 密码(可选) |
32+
| `database` | string | 目标数据库(必填) |
33+
| `table` | string | 目标表名(必填) |
34+
| `columns` | array | 列名列表,决定写入字段顺序(必填) |
35+
| `batch_size` | int | 批量写入条数(可选) |
36+
37+
## 配置示例
38+
39+
### 基础用法
40+
41+
```toml
42+
version = "2.0"
43+
44+
[sink_group]
45+
name = "all"
46+
oml = ["/*"]
47+
batch_timeout_ms=5000 # 当数据在设置的这个时间范围内不满足批量插入数量时自动插入
48+
parallel = 8
49+
50+
[[sink_group.sinks]]
51+
name = "main"
52+
connect = "postgresql_sink"
53+
54+
[sink_group.sinks.params]
55+
endpoint = "localhost:5432"
56+
username = "postgres"
57+
password = "123456"
58+
database = "wparse"
59+
table = "nginx_logs"
60+
columns = ["sip", "timestamp", "http/request", "status", "size", "referer", "http/agent", "wp_event_id"]
61+
batch_size = 1024
62+
```
63+
64+
## 注意事项
65+
66+
- `columns` 中的字段名需与 OML 输出字段一致;缺失的表字段会以 `NULL` 写入。
67+
- 可通过环境变量 `POSTGRESQL_URL` 覆盖连接串(格式:`postgresql://user:pass@host:port/db`)。
68+
- PostgreSQL 的配置项与 MySQL 完全一致,仅连接器 `type`、端口与连接信息不同。

docs-zh/10-user/05-connectors/02-sinks/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
| `tcp` | 输出到 TCP 服务端 | [TCP Sink 配置](./15-tcp_sink.md) |
1313
| `kafka` | 输出到 Kafka | [Kafka Sink 配置](./18-kafka_sink.md) |
1414
| `mysql` | 输出到 MySQL | [MySQL Sink 配置](./19-mysql_sink.md) |
15+
| `postgresql` | 输出到 PostgreSQL | [PostgreSQL Sink 配置](./23-postgresql_sink.md) |
1516
| `doris` | 输出到 Doris | [Doris Sink 配置](./17-doris.md) |
1617
| `elasticsearch` | 输出到 Elasticsearch | [Elasticsearch Sink 配置](./20-elasticsearch_sink.md) |
1718
| `prometheus` | Prometheus 指标暴露 | [Prometheus Sink 配置](./14-prometheus_sink.md) |

0 commit comments

Comments
 (0)