Skip to content

Commit d34c50f

Browse files
[FLUSS] Add producer offset snapshot management for undo recovery
This commit introduces a producer offset snapshot mechanism to support undo recovery when Flink jobs fail before completing their first checkpoint. Key changes: - Add ProducerSnapshotManager for snapshot lifecycle management including registration, retrieval, deletion, and periodic cleanup of expired snapshots - Add ProducerSnapshotStore for low-level ZooKeeper and remote storage operations - Add ProducerSnapshot and ProducerSnapshotJsonSerde for snapshot data model - Extend Admin API with registerProducerOffsets, getProducerOffsets, and deleteProducerOffsets operations - Add RPC protocol definitions for producer offset snapshot management - Add configuration options for snapshot TTL and cleanup interval - Add unit tests for ProducerSnapshotManager and JSON serialization
1 parent 864bbe6 commit d34c50f

19 files changed

Lines changed: 2144 additions & 4 deletions

File tree

fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171

7272
import java.util.Collection;
7373
import java.util.List;
74+
import java.util.Map;
7475
import java.util.Optional;
7576
import java.util.concurrent.CompletableFuture;
7677

@@ -604,4 +605,65 @@ CompletableFuture<Optional<RebalanceProgress>> listRebalanceProgress(
604605
* NoRebalanceInProgressException} will be thrown.
605606
*/
606607
CompletableFuture<Void> cancelRebalance(@Nullable String rebalanceId);
608+
609+
// ==================================================================================
610+
// Producer Offset Management APIs (for Exactly-Once Semantics)
611+
// ==================================================================================
612+
613+
/**
614+
* Register producer offset snapshot.
615+
*
616+
* <p>This method provides atomic "check and register" semantics:
617+
*
618+
* <ul>
619+
* <li>If snapshot does not exist: create new snapshot and return {@link
620+
* RegisterResult#CREATED}
621+
* <li>If snapshot already exists: do NOT overwrite and return {@link
622+
* RegisterResult#ALREADY_EXISTS}
623+
* </ul>
624+
*
625+
* <p>The atomicity is guaranteed by the server implementation. This enables the caller to
626+
* determine whether undo recovery is needed based on the return value.
627+
*
628+
* <p>The snapshot will be automatically cleaned up after the configured TTL expires.
629+
*
630+
* <p>This API is typically used by Flink Operator Coordinator at job startup to register the
631+
* initial offset snapshot before any data is written.
632+
*
633+
* @param producerId the ID of the producer (typically Flink job ID)
634+
* @param offsets map of TableBucket to offset for all tables
635+
* @return a CompletableFuture containing the registration result indicating whether the
636+
* snapshot was newly created or already existed
637+
* @since 0.9
638+
*/
639+
CompletableFuture<RegisterResult> registerProducerOffsets(
640+
String producerId, Map<TableBucket, Long> offsets);
641+
642+
/**
643+
* Get producer offset snapshot.
644+
*
645+
* <p>This method retrieves the registered offset snapshot for a producer. Returns null if no
646+
* snapshot exists for the given producer ID.
647+
*
648+
* <p>This API is typically used by Flink Operator Coordinator at job startup to check if a
649+
* previous snapshot exists (indicating a failover before first checkpoint).
650+
*
651+
* @param producerId the ID of the producer
652+
* @return a CompletableFuture containing the producer offsets, or null if not found
653+
* @since 0.9
654+
*/
655+
CompletableFuture<ProducerOffsetsResult> getProducerOffsets(String producerId);
656+
657+
/**
658+
* Delete producer offset snapshot.
659+
*
660+
* <p>This method deletes the registered offset snapshot for a producer. This is typically
661+
* called after the first checkpoint completes successfully, as the checkpoint state will be
662+
* used for recovery instead of the initial snapshot.
663+
*
664+
* @param producerId the ID of the producer
665+
* @return a CompletableFuture that completes when deletion succeeds
666+
* @since 0.9
667+
*/
668+
CompletableFuture<Void> deleteProducerOffsets(String producerId);
607669
}

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.apache.fluss.rpc.messages.CreateTableRequest;
5858
import org.apache.fluss.rpc.messages.DatabaseExistsRequest;
5959
import org.apache.fluss.rpc.messages.DatabaseExistsResponse;
60+
import org.apache.fluss.rpc.messages.DeleteProducerOffsetsRequest;
6061
import org.apache.fluss.rpc.messages.DescribeClusterConfigsRequest;
6162
import org.apache.fluss.rpc.messages.DropAclsRequest;
6263
import org.apache.fluss.rpc.messages.DropDatabaseRequest;
@@ -65,6 +66,7 @@
6566
import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataRequest;
6667
import org.apache.fluss.rpc.messages.GetLatestKvSnapshotsRequest;
6768
import org.apache.fluss.rpc.messages.GetLatestLakeSnapshotRequest;
69+
import org.apache.fluss.rpc.messages.GetProducerOffsetsRequest;
6870
import org.apache.fluss.rpc.messages.GetTableInfoRequest;
6971
import org.apache.fluss.rpc.messages.GetTableSchemaRequest;
7072
import org.apache.fluss.rpc.messages.ListAclsRequest;
@@ -76,11 +78,15 @@
7678
import org.apache.fluss.rpc.messages.ListTablesRequest;
7779
import org.apache.fluss.rpc.messages.ListTablesResponse;
7880
import org.apache.fluss.rpc.messages.PbAlterConfig;
81+
import org.apache.fluss.rpc.messages.PbBucketOffset;
7982
import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket;
8083
import org.apache.fluss.rpc.messages.PbPartitionSpec;
84+
import org.apache.fluss.rpc.messages.PbTableBucketOffset;
85+
import org.apache.fluss.rpc.messages.PbTableOffsets;
8186
import org.apache.fluss.rpc.messages.PbTablePath;
8287
import org.apache.fluss.rpc.messages.RebalanceRequest;
8388
import org.apache.fluss.rpc.messages.RebalanceResponse;
89+
import org.apache.fluss.rpc.messages.RegisterProducerOffsetsRequest;
8490
import org.apache.fluss.rpc.messages.RemoveServerTagRequest;
8591
import org.apache.fluss.rpc.messages.TableExistsRequest;
8692
import org.apache.fluss.rpc.messages.TableExistsResponse;
@@ -588,6 +594,97 @@ public CompletableFuture<Void> cancelRebalance(@Nullable String rebalanceId) {
588594
return gateway.cancelRebalance(request).thenApply(r -> null);
589595
}
590596

597+
// ==================================================================================
598+
// Producer Offset Management APIs (for Exactly-Once Semantics)
599+
// ==================================================================================
600+
601+
@Override
602+
public CompletableFuture<RegisterResult> registerProducerOffsets(
603+
String producerId, Map<TableBucket, Long> offsets) {
604+
checkNotNull(producerId, "producerId must not be null");
605+
checkNotNull(offsets, "offsets must not be null");
606+
607+
RegisterProducerOffsetsRequest request = new RegisterProducerOffsetsRequest();
608+
request.setProducerId(producerId);
609+
610+
// Convert TableBucket offsets to PbTableBucketOffset
611+
for (Map.Entry<TableBucket, Long> entry : offsets.entrySet()) {
612+
TableBucket bucket = entry.getKey();
613+
PbTableBucketOffset pbOffset =
614+
request.addBucketOffset()
615+
.setTableId(bucket.getTableId())
616+
.setBucketId(bucket.getBucket())
617+
.setOffset(entry.getValue());
618+
if (bucket.getPartitionId() != null) {
619+
pbOffset.setPartitionId(bucket.getPartitionId());
620+
}
621+
}
622+
623+
return gateway.registerProducerOffsets(request)
624+
.thenApply(
625+
response -> {
626+
// result: 0 = CREATED, 1 = ALREADY_EXISTS
627+
int result = response.hasResult() ? response.getResult() : 0;
628+
return result == 0
629+
? RegisterResult.CREATED
630+
: RegisterResult.ALREADY_EXISTS;
631+
});
632+
}
633+
634+
@Override
635+
public CompletableFuture<ProducerOffsetsResult> getProducerOffsets(String producerId) {
636+
checkNotNull(producerId, "producerId must not be null");
637+
638+
GetProducerOffsetsRequest request = new GetProducerOffsetsRequest();
639+
request.setProducerId(producerId);
640+
641+
return gateway.getProducerOffsets(request)
642+
.thenApply(
643+
response -> {
644+
if (!response.hasProducerId()) {
645+
// No snapshot found for this producer
646+
return null;
647+
}
648+
649+
Map<Long, Map<TableBucket, Long>> tableOffsets = new HashMap<>();
650+
for (PbTableOffsets pbTableOffsets : response.getTableOffsetsList()) {
651+
long tableId = pbTableOffsets.getTableId();
652+
Map<TableBucket, Long> bucketOffsets = new HashMap<>();
653+
654+
for (PbBucketOffset pbBucketOffset :
655+
pbTableOffsets.getBucketOffsetsList()) {
656+
Long partitionId =
657+
pbBucketOffset.hasPartitionId()
658+
? pbBucketOffset.getPartitionId()
659+
: null;
660+
TableBucket bucket =
661+
new TableBucket(
662+
tableId,
663+
partitionId,
664+
pbBucketOffset.getBucketId());
665+
bucketOffsets.put(bucket, pbBucketOffset.getLogEndOffset());
666+
}
667+
668+
tableOffsets.put(tableId, bucketOffsets);
669+
}
670+
671+
long expirationTime =
672+
response.hasExpirationTime() ? response.getExpirationTime() : 0;
673+
return new ProducerOffsetsResult(
674+
response.getProducerId(), tableOffsets, expirationTime);
675+
});
676+
}
677+
678+
@Override
679+
public CompletableFuture<Void> deleteProducerOffsets(String producerId) {
680+
checkNotNull(producerId, "producerId must not be null");
681+
682+
DeleteProducerOffsetsRequest request = new DeleteProducerOffsetsRequest();
683+
request.setProducerId(producerId);
684+
685+
return gateway.deleteProducerOffsets(request).thenApply(r -> null);
686+
}
687+
591688
@Override
592689
public void close() {
593690
// nothing to do yet
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.admin;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.metadata.TableBucket;
22+
23+
import java.util.Collections;
24+
import java.util.Map;
25+
26+
/**
27+
* Result containing producer offset snapshot data.
28+
*
29+
* <p>This class holds the offset snapshot for a producer, which is used for undo recovery when a
30+
* Flink job fails over before completing its first checkpoint.
31+
*
32+
* <p>The snapshot contains bucket offsets organized by table ID, allowing the Flink Operator
33+
* Coordinator to coordinate undo recovery across all subtasks.
34+
*
35+
* @since 0.9
36+
*/
37+
@PublicEvolving
38+
public class ProducerOffsetsResult {
39+
40+
private final String producerId;
41+
private final Map<Long, Map<TableBucket, Long>> tableOffsets;
42+
private final long expirationTime;
43+
44+
/**
45+
* Creates a new ProducerOffsetsResult.
46+
*
47+
* @param producerId the producer ID (typically Flink job ID)
48+
* @param tableOffsets map of table ID to bucket offsets
49+
* @param expirationTime the expiration timestamp in milliseconds
50+
*/
51+
public ProducerOffsetsResult(
52+
String producerId,
53+
Map<Long, Map<TableBucket, Long>> tableOffsets,
54+
long expirationTime) {
55+
this.producerId = producerId;
56+
this.tableOffsets = Collections.unmodifiableMap(tableOffsets);
57+
this.expirationTime = expirationTime;
58+
}
59+
60+
/**
61+
* Get the producer ID.
62+
*
63+
* @return the producer ID
64+
*/
65+
public String getProducerId() {
66+
return producerId;
67+
}
68+
69+
/**
70+
* Get the offset snapshot for all tables.
71+
*
72+
* @return unmodifiable map of table ID to bucket offsets
73+
*/
74+
public Map<Long, Map<TableBucket, Long>> getTableOffsets() {
75+
return tableOffsets;
76+
}
77+
78+
/**
79+
* Get the expiration timestamp.
80+
*
81+
* @return the expiration timestamp in milliseconds since epoch
82+
*/
83+
public long getExpirationTime() {
84+
return expirationTime;
85+
}
86+
87+
@Override
88+
public String toString() {
89+
return "ProducerOffsetsResult{"
90+
+ "producerId='"
91+
+ producerId
92+
+ '\''
93+
+ ", tableCount="
94+
+ tableOffsets.size()
95+
+ ", expirationTime="
96+
+ expirationTime
97+
+ '}';
98+
}
99+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.admin;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
22+
/**
23+
* Result of producer offset registration.
24+
*
25+
* <p>This enum indicates whether a producer offset snapshot was newly created or already existed
26+
* when calling {@link Admin#registerProducerOffsets}.
27+
*
28+
* @since 0.9
29+
*/
30+
@PublicEvolving
31+
public enum RegisterResult {
32+
/**
33+
* Snapshot was newly created.
34+
*
35+
* <p>This indicates a first startup scenario where no previous snapshot existed. The caller
36+
* does not need to perform undo recovery.
37+
*/
38+
CREATED,
39+
40+
/**
41+
* Snapshot already existed and was not overwritten.
42+
*
43+
* <p>This indicates a failover scenario where a previous snapshot exists. The caller should
44+
* perform undo recovery using the existing snapshot offsets.
45+
*/
46+
ALREADY_EXISTS
47+
}

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,31 @@ public class ConfigOptions {
354354
+ "The default value is 10. "
355355
+ "This option is deprecated. Please use server.io-pool.size instead.");
356356

357+
/**
358+
* The TTL (time-to-live) for producer offset snapshots. Snapshots older than this TTL will be
359+
* automatically cleaned up by the coordinator server.
360+
*/
361+
public static final ConfigOption<Duration> COORDINATOR_PRODUCER_SNAPSHOT_TTL =
362+
key("coordinator.producer-snapshot.ttl")
363+
.durationType()
364+
.defaultValue(Duration.ofHours(24))
365+
.withDescription(
366+
"The TTL (time-to-live) for producer offset snapshots. "
367+
+ "Snapshots older than this TTL will be automatically cleaned up "
368+
+ "by the coordinator server. Default is 24 hours.");
369+
370+
/**
371+
* The interval for cleaning up expired producer offset snapshots and orphan files in remote
372+
* storage.
373+
*/
374+
public static final ConfigOption<Duration> COORDINATOR_PRODUCER_SNAPSHOT_CLEANUP_INTERVAL =
375+
key("coordinator.producer-snapshot.cleanup-interval")
376+
.durationType()
377+
.defaultValue(Duration.ofHours(1))
378+
.withDescription(
379+
"The interval for cleaning up expired producer offset snapshots "
380+
+ "and orphan files in remote storage. Default is 1 hour.");
381+
357382
// ------------------------------------------------------------------------
358383
// ConfigOptions for Tablet Server
359384
// ------------------------------------------------------------------------

0 commit comments

Comments
 (0)