Skip to content

Commit 9e33d31

Browse files
[server][client][flink] Add retract support for Aggregation Merge Engine
1 parent 0e265ad commit 9e33d31

81 files changed

Lines changed: 6448 additions & 582 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

docs/superpowers/plans/2026-03-28-retract-architecture-redesign.md

Lines changed: 561 additions & 0 deletions
Large diffs are not rendered by default.

docs/superpowers/plans/2026-03-28-retract-bugfix.md

Lines changed: 431 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 323 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,323 @@
1+
# Retract Architecture Redesign
2+
3+
## 概述
4+
5+
重新设计 Aggregation Merge Engine 的 retract 支持架构,解决三个架构级缺陷:
6+
7+
1. **AD-1 隐式配对协议** — 当前 retract+upsert 的配对关系在 wire protocol 中没有显式标记,server 依赖位置启发式配对,unpaired retract 会静默损坏数据
8+
2. **AD-2 Checkpoint 活性风险** — Flink 层缓冲 pending retract,checkpoint 时未匹配则抛异常,反压下可能进入 checkpoint 失败循环
9+
3. **AD-3 MergeMode 前向兼容性**`MergeMode.fromValue()` 对未知值抛 uncaught 异常
10+
11+
## 设计原则
12+
13+
- **每条 record 自描述** — per-record `MutationType` 枚举显式标识 UPSERT / DELETE / RETRACT
14+
- **每条 record 独立处理** — server 端不依赖配对,每条 record 都能独立生成完整 CDC
15+
- **配对是优化,不是正确性依赖** — batch 内相邻同 key retract+upsert 合并处理跳过 intermediate 值,减少 CDC 条数
16+
- **Flink 无状态** — 收到 -U 立即发送 RETRACT record,收到 +U 立即发送 UPSERT record,不缓冲
17+
18+
## 约束
19+
20+
- 不破坏 main 分支已有协议的升级兼容性
21+
- retract 功能仅支持 aggregation merge engine 且整表所有非 PK 聚合列均支持 retract 的表
22+
- 不满足条件的表必须维持过去的语义
23+
24+
---
25+
26+
## 一、Wire Format 变更
27+
28+
### 1.1 PUT_KV API Version 2 的 KvRecord 新格式
29+
30+
```
31+
V0/V1 (现有): Length(4B) + KeyLength(varint) + Key + Row
32+
V2 (新增): Length(4B) + MutationType(1B) + KeyLength(varint) + Key + Row
33+
```
34+
35+
注:当前 retract 分支已定义了 PUT_KV V2(用于 request-level `RETRACT_THEN_AGGREGATE`),但该定义仅存在于未合入的 feature 分支,从未发布。本设计重新定义 V2 语义为 per-record MutationType 格式。
36+
37+
### 1.2 MutationType 枚举
38+
39+
新增 `org.apache.fluss.record.MutationType` 枚举(`fluss-common` 模块,与 `DefaultKvRecord` 同包):
40+
41+
|| 名称 | 语义 |
42+
|----|------|------|
43+
| 0 | UPSERT | 普通 upsert 记录 |
44+
| 1 | DELETE | 显式删除记录 |
45+
| 2 | RETRACT | 撤回记录,携带要撤回的旧值 |
46+
47+
`MutationType` 是 per-record 字段,存在于 KvRecord body 内部。它与 batch/request 级别的 `MergeMode`(DEFAULT/OVERWRITE)是完全独立的命名空间,两者互不干扰。
48+
49+
### 1.3 DELETE 语义在 V2 中的定义
50+
51+
V0/V1 中 DELETE 通过隐式约定表达:Row 为 null(value length = 0)即 DELETE。V2 引入显式 `MutationType.DELETE` 后,两者的关系如下:
52+
53+
- V2 DELETE 记录:`MutationType=DELETE`,Row 必须为 null。如果 Row 非 null,server 忽略 Row 值(防御性处理)。
54+
- V2 UPSERT 记录:`MutationType=UPSERT`,Row 为 null 视为 `InvalidRecordException`(V2 中 UPSERT 必须携带值)。
55+
- V2 RETRACT 记录:`MutationType=RETRACT`,Row 为 null 视为 `InvalidRecordException`(RETRACT 必须携带要撤回的旧值)。
56+
- V0/V1 记录:行为不变,Row 为 null 仍然表示 DELETE。
57+
58+
### 1.4 MergeMode 回滚
59+
60+
- 删除 `RETRACT_THEN_AGGREGATE` 枚举值
61+
- `MergeMode` 保持只有 `DEFAULT``OVERWRITE`
62+
- `PutKvRequest` 中的 `agg_mode` 字段保持原样(用于 DEFAULT/OVERWRITE 区分)
63+
- Retract 概念完全下沉到 per-record `MutationType`,不再污染 batch 级别
64+
65+
### 1.5 版本门控
66+
67+
- PUT_KV v0/v1:server 用 V0 record 格式解析(无 MutationType 字节)
68+
- PUT_KV v2:server 用 V2 record 格式解析(有 MutationType 字节)
69+
- API 版本协商(`ServerApiVersions`)保证老 server 永远不会收到 v2 请求
70+
71+
### 1.6 Batch 级别
72+
73+
- Batch header `Attributes` 字节不变(仍为 0)
74+
- Batch `magic` 不变(仍为 V0)
75+
- 同一个 batch 内可自由混合 UPSERT、DELETE、RETRACT 记录
76+
77+
### 1.7 DefaultKvRecord V2 实现
78+
79+
**写入(`DefaultKvRecord.writeTo`):**
80+
81+
新增 V2 重载方法:
82+
83+
```java
84+
public static int writeToV2(OutputView outputView, MutationType mutationType,
85+
byte[] key, @Nullable BinaryRow row) throws IOException {
86+
// 写 Length(占位)
87+
// 写 MutationType(1 byte)
88+
// 写 KeyLength + Key + Row(同 V0)
89+
}
90+
```
91+
92+
原有 `writeTo(OutputView, byte[], BinaryRow)` 保持不变,用于 V0/V1。
93+
94+
**读取(`DefaultKvRecord.readFrom`):**
95+
96+
新增 V2 重载或通过 `ReadContext` 传递 API version:
97+
98+
```java
99+
public static KvRecord readFromV2(MemorySegment segment, int position,
100+
short schemaId, ReadContext readContext) {
101+
// 读 Length
102+
// 读 MutationType(1 byte)
103+
// 读 KeyLength + Key + Row(同 V0)
104+
}
105+
```
106+
107+
**`KvRecord` 接口新增:**
108+
109+
```java
110+
/** 获取 record 的 mutation 类型。V0/V1 record 默认返回 UPSERT(Row 非 null)或 DELETE(Row 为 null)。 */
111+
default MutationType getMutationType() {
112+
return getRow() == null ? MutationType.DELETE : MutationType.UPSERT;
113+
}
114+
```
115+
116+
**API version 传递路径:**
117+
118+
RPC handler(`TabletService.putKv`)已知请求的 API version → 传递给 `KvTablet.putAsLeader()` → 传递给 `processKvRecords()` → 构造 `ReadContext` 时携带 version 信息 → record iterator 根据 version 选择 V0 或 V2 解析。
119+
120+
### 1.8 Protobuf 变更
121+
122+
`PutKvRequest` protobuf message 本身不变。V2 格式变更完全在序列化的 KvRecordBatch payload 内部。`MergeMode` proto enum 中的 `RETRACT_THEN_AGGREGATE(2)` 从 feature 分支回滚删除(该值从未发布到 main)。
123+
124+
---
125+
126+
## 二、Server 端处理逻辑
127+
128+
### 2.1 核心原则
129+
130+
每条 record 独立处理,配对是纯优化。
131+
132+
### 2.2 Retract 正确性契约
133+
134+
Retract 正确性依赖上游(Flink)发送的 retract 值与之前聚合的值精确匹配。Server 不校验 retract 值是否与历史贡献一致 — 添加校验需要存储 per-key 贡献历史,违背聚合引擎的设计初衷(避免大状态)。如果上游发送了不匹配的 retract 值,聚合状态会被静默损坏。
135+
136+
### 2.3 统一处理流程
137+
138+
`KvTablet.processKvRecords()` 统一处理所有 MutationType:
139+
140+
```
141+
遍历 batch 中的 records:
142+
mutationType = record.getMutationType()
143+
144+
switch (mutationType):
145+
case UPSERT:
146+
old = read(key)
147+
new = merge(old, value)
148+
if old == null:
149+
生成 +I(new)
150+
else if new.equals(old):
151+
跳过(no-change 优化)
152+
else:
153+
生成 UB(old) + UA(new)
154+
155+
case DELETE:
156+
现有逻辑不变
157+
158+
case RETRACT:
159+
old = read(key)
160+
if old == null:
161+
跳过(retract 不存在的 key,无操作)
162+
else:
163+
// 合并优化:peek 下一条
164+
if hasNext() && next.key == key && next.mutationType == UPSERT:
165+
upsertRecord = consumeNext()
166+
// 两步操作:先 retract 再 merge(复用现有 RowMerger 接口)
167+
intermediate = currentMerger.retract(old, retractVal)
168+
new = currentMerger.merge(intermediate, upsertVal)
169+
if new.equals(old):
170+
跳过
171+
else:
172+
生成 UB(old) + UA(new) // 2条,跳过 intermediate
173+
else:
174+
// 独立 retract
175+
intermediate = currentMerger.retract(old, retractVal)
176+
if intermediate == null:
177+
deleteBehavior = currentMerger.deleteBehavior()
178+
if deleteBehavior == IGNORE:
179+
跳过
180+
else if deleteBehavior == DISABLE:
181+
throw DeletionDisabledException
182+
else: // ALLOW
183+
apply delete, 生成 -D(old)
184+
else if intermediate.equals(old):
185+
跳过
186+
else:
187+
生成 UB(old) + UA(intermediate)
188+
```
189+
190+
注:合并优化路径中的 `retract` + `merge` 是对现有 `RowMerger.retract()``RowMerger.merge()` 的顺序调用,不引入新的 `retractThenMerge` 方法。
191+
192+
### 2.4 错误处理
193+
194+
- 收到 RETRACT 但 `RowMerger.supportsRetract()=false` → 抛 `InvalidRecordException`
195+
- 收到 RETRACT 但表不是 aggregation merge engine → 抛 `InvalidRecordException`
196+
- `MutationType` 未知值 → 抛 `InvalidRecordException`(显式拒绝,不静默)
197+
198+
### 2.5 删除的 server 代码
199+
200+
- `processRetractThenAggregateRecords()` 整个方法
201+
- `processRetractThenAggregate()` 整个方法
202+
- `MergeMode` 分发逻辑(`if mergeMode == RETRACT_THEN_AGGREGATE`
203+
204+
---
205+
206+
## 三、Flink 层变更
207+
208+
### 3.1 UpsertSinkWriter 简化
209+
210+
```java
211+
// 之前:缓冲 -U,等 +U 配对,调用 retractThenUpsert()
212+
// 之后:
213+
case RETRACT:
214+
upsertWriter.retract(row); // 直接发送,无缓冲
215+
case UPSERT:
216+
upsertWriter.upsert(row); // 直接发送,无配对检查
217+
```
218+
219+
删除:
220+
- `pendingRetractRows` HashMap 及所有相关逻辑
221+
- `flush()` 中的 unmatched retract 检查和 `IOException`
222+
- `close()` 中的 orphaned retract warn 逻辑
223+
- `writeRow()` 中的 RETRACT/UPSERT 配对匹配分支
224+
225+
### 3.2 RowDataSerializationSchema.toOperationType()
226+
227+
分支逻辑不变,RETRACT 分支的下游行为变化(直接发送 vs 缓冲配对):
228+
229+
```
230+
UPDATE_BEFORE:
231+
if ignoreDelete → IGNORE(现有语义,非 agg 表的逃生通道)
232+
if schemaSupportsRetract → RETRACT(直接映射,不再缓冲)
233+
if isAggregationTable → fail-fast(agg 表但有不支持 retract 的函数)
234+
else → DELETE(非 agg PK 表的现有语义)
235+
```
236+
237+
### 3.3 不变的部分
238+
239+
- `FlinkTableSink.getChangelogMode()` — 不变
240+
- `FlinkConversions.computeSchemaSupportsRetract()` — 不变
241+
- 不满足 retract 条件的表 — 行为完全不变
242+
243+
### 3.4 删除的 Flink 代码
244+
245+
- `UpsertWriter.retractThenUpsert()` 接口方法
246+
- `UpsertWriterImpl.retractThenUpsert()` 实现
247+
- `UpsertSinkWriter` 中所有 checkpoint 相关的 retract 逻辑
248+
249+
---
250+
251+
## 四、Client 层变更
252+
253+
### 4.1 WriteRecord
254+
255+
- `mergeMode` 字段保留,但仅用于 `DEFAULT``OVERWRITE``RETRACT_THEN_AGGREGATE` 回滚删除)
256+
- 新增 `mutationType` 字段(`MutationType` 枚举)
257+
- `WriteRecord.forUpsert()` → mutationType=UPSERT
258+
- `WriteRecord.forDelete()` → mutationType=DELETE
259+
- 新增 `WriteRecord.forRetract()` → mutationType=RETRACT
260+
261+
### 4.2 KvWriteBatch
262+
263+
- 删除 `tryAppendPair()` 方法
264+
- `tryAppend()` 正常工作,retract 和 upsert 记录自由混合
265+
- batch-level `mergeMode` 保留(用于 DEFAULT/OVERWRITE 区分)
266+
267+
### 4.3 KvRecordBatchBuilder
268+
269+
- 删除 `hasRoomForPair()`
270+
- `append()` 方法新增 `mutationType` 参数,PUT_KV v2 时写入 1-byte MutationType 到 record 头部
271+
272+
### 4.4 RecordAccumulator
273+
274+
- 删除 `appendPair()` / `appendNewBatchPair()` 及所有 pair 相关逻辑
275+
- `append()` 正常工作,retract record 和 upsert record 走同一条路径
276+
- 不再需要按 MergeMode 拆分 batch(retract 和 upsert 可混合在同一 batch)
277+
278+
### 4.5 WriterClient
279+
280+
- 删除 `sendPair()` / `doSendPair()`
281+
- 删除 `AbstractTableWriter.sendPairWithResult()`
282+
- `send()` 正常工作,retract 就是一条普通的 WriteRecord
283+
284+
### 4.6 UpsertWriter 接口
285+
286+
- 删除 `retractThenUpsert(InternalRow retractRow, InternalRow upsertRow)`
287+
- 新增 `retract(InternalRow row)` — 内部调用 `send(WriteRecord.forRetract(...))`
288+
289+
---
290+
291+
## 五、向后兼容性
292+
293+
| 场景 | 行为 |
294+
|------|------|
295+
| 新 client → 老 server | API 版本协商降级到 v1,不发送 RETRACT record,retract 功能不可用 |
296+
| 老 client → 新 server | server 按 v0/v1 格式解析,无 MutationType 字节,行为不变 |
297+
| 新 client → 新 server | PUT_KV v2,完整 retract 支持 |
298+
299+
**滚动升级:** 客户端按 per-server 协商 API 版本。滚动升级期间,部分 tablet server 可能仍为旧版本。客户端对已升级 server 使用 V2(支持 retract),对未升级 server 降级到 V1(retract 不可用)。建议在升级文档中要求所有 tablet server 升级完成后再启用 retract 功能。
300+
301+
---
302+
303+
## 六、删除代码汇总
304+
305+
| 模块 | 删除项 |
306+
|------|--------|
307+
| fluss-common | `MergeMode.RETRACT_THEN_AGGREGATE` 枚举值;`MergeMode` 相关的所有新增变更回滚到 main |
308+
| fluss-client | `appendPair()`, `appendNewBatchPair()`, `tryAppendPair()`, `hasRoomForPair()`, `sendPair()`, `doSendPair()`, `sendPairWithResult()`, `retractThenUpsert()` |
309+
| fluss-flink | `UpsertSinkWriter.pendingRetractRows` 及配对/flush/close 逻辑;`UpsertWriter.retractThenUpsert()` |
310+
| fluss-server | `processRetractThenAggregateRecords()`, `processRetractThenAggregate()`, `MergeMode` 分发逻辑 |
311+
312+
---
313+
314+
## 七、新增代码汇总
315+
316+
| 模块 | 新增项 |
317+
|------|--------|
318+
| fluss-common | `MutationType` 枚举(UPSERT=0, DELETE=1, RETRACT=2) |
319+
| fluss-common | `DefaultKvRecord` V2 格式读写(`writeToV2` / `readFromV2`,含 MutationType 字节) |
320+
| fluss-common | `KvRecord` 接口新增 `getMutationType()` 默认方法 |
321+
| fluss-client | `WriteRecord.forRetract()` 工厂方法;`WriteRecord.mutationType` 字段 |
322+
| fluss-client | `UpsertWriter.retract(InternalRow)` 接口方法及实现 |
323+
| fluss-server | `processKvRecords()` 中 RETRACT case 处理(独立处理 + 合并优化) |

fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriter.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,22 @@ public interface UpsertWriter extends TableWriter {
3838
*/
3939
CompletableFuture<UpsertResult> upsert(InternalRow record);
4040

41+
/**
42+
* Retracts a previous aggregation contribution for the given row. The row must contain the
43+
* primary key fields and the old aggregation values to retract.
44+
*
45+
* <p>This is used when a Flink aggregate operator emits UPDATE_BEFORE (retract old value) for a
46+
* key. The retract record is sent independently with {@link
47+
* org.apache.fluss.record.MutationType#RETRACT} mutation type.
48+
*
49+
* @param row the old aggregation value to retract (UPDATE_BEFORE).
50+
* @return A {@link CompletableFuture} that returns upsert result when complete normally.
51+
*/
52+
default CompletableFuture<UpsertResult> retract(InternalRow row) {
53+
throw new UnsupportedOperationException(
54+
"retract() is not supported by this UpsertWriter implementation.");
55+
}
56+
4157
/**
4258
* Delete a certain record from the Fluss table. The input must contain the primary key fields.
4359
*

0 commit comments

Comments
 (0)