From 478e8ec58473ed8a1f820ff9c97ac9a7c87d21a7 Mon Sep 17 00:00:00 2001 From: "ocean.wy" Date: Fri, 23 Jan 2026 18:30:46 +0800 Subject: [PATCH] [FLUSS] Add Producer Offset Snapshot for Exactly-Once Semantics This commit introduces the Producer Offset Snapshot feature to support exactly-once semantics in Fluss. The feature allows producers (e.g., Flink jobs) to register their offset snapshots for recovery purposes. Main changes: - Add ProducerSnapshotManager for lifecycle management of producer snapshots - Add ProducerSnapshotStore for low-level storage operations (ZK + remote FS) - Add Admin APIs: registerProducerOffsets, getProducerOffsets, deleteProducerOffsets - Add RetryUtils for IO operations with exponential backoff - Add configuration options for snapshot TTL and cleanup interval Code quality improvements: - Remove ProducerSnapshotResultCodes class, use RegisterResult enum directly - Fix RetryUtils interrupt handling to preserve thread interrupt status - Add comprehensive tests for ProducerSnapshotManager including concurrency tests - Add interrupt handling tests for RetryUtils --- .../org/apache/fluss/client/admin/Admin.java | 63 ++ .../apache/fluss/client/admin/FlussAdmin.java | 83 +++ .../client/admin/ProducerOffsetsResult.java | 99 +++ .../client/utils/ClientRpcMessageUtils.java | 46 ++ .../client/admin/ProducerOffsetsITCase.java | 378 ++++++++++++ .../apache/fluss/config/ConfigOptions.java | 25 + .../fluss/rpc/messages/RegisterResult.java | 69 +++ .../org/apache/fluss/utils/RetryUtils.java | 165 +++++ .../utils/function/ThrowingSupplier.java | 37 ++ .../apache/fluss/utils/RetryUtilsTest.java | 280 +++++++++ .../fluss/rpc/gateway/AdminGateway.java | 40 ++ .../apache/fluss/rpc/protocol/ApiKeys.java | 5 +- fluss-rpc/src/main/proto/FlussApi.proto | 49 ++ .../coordinator/CoordinatorService.java | 148 +++++ .../producer/ProducerSnapshotManager.java | 573 ++++++++++++++++++ .../producer/ProducerSnapshotStore.java | 384 ++++++++++++ .../server/utils/ServerRpcMessageUtils.java | 67 ++ .../fluss/server/zk/ZooKeeperClient.java | 133 ++++ .../apache/fluss/server/zk/data/ZkData.java | 65 ++ .../zk/data/producer/ProducerSnapshot.java | 182 ++++++ .../producer/ProducerSnapshotJsonSerde.java | 110 ++++ .../coordinator/TestCoordinatorGateway.java | 24 + .../producer/ProducerSnapshotManagerTest.java | 357 +++++++++++ .../producer/ProducerSnapshotStoreTest.java | 337 ++++++++++ .../ProducerSnapshotJsonSerdeTest.java | 61 ++ website/docs/maintenance/configuration.md | 8 +- 26 files changed, 3784 insertions(+), 4 deletions(-) create mode 100644 fluss-client/src/main/java/org/apache/fluss/client/admin/ProducerOffsetsResult.java create mode 100644 fluss-client/src/test/java/org/apache/fluss/client/admin/ProducerOffsetsITCase.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/rpc/messages/RegisterResult.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/utils/RetryUtils.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/utils/function/ThrowingSupplier.java create mode 100644 fluss-common/src/test/java/org/apache/fluss/utils/RetryUtilsTest.java create mode 100644 fluss-server/src/main/java/org/apache/fluss/server/coordinator/producer/ProducerSnapshotManager.java create mode 100644 fluss-server/src/main/java/org/apache/fluss/server/coordinator/producer/ProducerSnapshotStore.java create mode 100644 fluss-server/src/main/java/org/apache/fluss/server/zk/data/producer/ProducerSnapshot.java create mode 100644 fluss-server/src/main/java/org/apache/fluss/server/zk/data/producer/ProducerSnapshotJsonSerde.java create mode 100644 fluss-server/src/test/java/org/apache/fluss/server/coordinator/producer/ProducerSnapshotManagerTest.java create mode 100644 fluss-server/src/test/java/org/apache/fluss/server/coordinator/producer/ProducerSnapshotStoreTest.java create mode 100644 fluss-server/src/test/java/org/apache/fluss/server/zk/data/producer/ProducerSnapshotJsonSerdeTest.java diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java index 0d8a68cb22..bdba256bee 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java @@ -64,6 +64,7 @@ import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.rpc.messages.RegisterResult; import org.apache.fluss.security.acl.AclBinding; import org.apache.fluss.security.acl.AclBindingFilter; @@ -71,6 +72,7 @@ import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -604,4 +606,65 @@ CompletableFuture> listRebalanceProgress( * NoRebalanceInProgressException} will be thrown. */ CompletableFuture cancelRebalance(@Nullable String rebalanceId); + + // ================================================================================== + // Producer Offset Management APIs (for Exactly-Once Semantics) + // ================================================================================== + + /** + * Register producer offset snapshot. + * + *

This method provides atomic "check and register" semantics: + * + *

    + *
  • If snapshot does not exist: create new snapshot and return {@link + * RegisterResult#CREATED} + *
  • If snapshot already exists: do NOT overwrite and return {@link + * RegisterResult#ALREADY_EXISTS} + *
+ * + *

The atomicity is guaranteed by the server implementation. This enables the caller to + * determine whether undo recovery is needed based on the return value. + * + *

The snapshot will be automatically cleaned up after the configured TTL expires. + * + *

This API is typically used by Flink Operator Coordinator at job startup to register the + * initial offset snapshot before any data is written. + * + * @param producerId the ID of the producer (typically Flink job ID) + * @param offsets map of TableBucket to offset for all tables + * @return a CompletableFuture containing the registration result indicating whether the + * snapshot was newly created or already existed + * @since 0.9 + */ + CompletableFuture registerProducerOffsets( + String producerId, Map offsets); + + /** + * Get producer offset snapshot. + * + *

This method retrieves the registered offset snapshot for a producer. Returns null if no + * snapshot exists for the given producer ID. + * + *

This API is typically used by Flink Operator Coordinator at job startup to check if a + * previous snapshot exists (indicating a failover before first checkpoint). + * + * @param producerId the ID of the producer + * @return a CompletableFuture containing the producer offsets, or null if not found + * @since 0.9 + */ + CompletableFuture getProducerOffsets(String producerId); + + /** + * Delete producer offset snapshot. + * + *

This method deletes the registered offset snapshot for a producer. This is typically + * called after the first checkpoint completes successfully, as the checkpoint state will be + * used for recovery instead of the initial snapshot. + * + * @param producerId the ID of the producer + * @return a CompletableFuture that completes when deletion succeeds + * @since 0.9 + */ + CompletableFuture deleteProducerOffsets(String producerId); } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java index 88aec06c8d..deb729920e 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java @@ -57,6 +57,7 @@ import org.apache.fluss.rpc.messages.CreateTableRequest; import org.apache.fluss.rpc.messages.DatabaseExistsRequest; import org.apache.fluss.rpc.messages.DatabaseExistsResponse; +import org.apache.fluss.rpc.messages.DeleteProducerOffsetsRequest; import org.apache.fluss.rpc.messages.DescribeClusterConfigsRequest; import org.apache.fluss.rpc.messages.DropAclsRequest; import org.apache.fluss.rpc.messages.DropDatabaseRequest; @@ -65,6 +66,7 @@ import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataRequest; import org.apache.fluss.rpc.messages.GetLatestKvSnapshotsRequest; import org.apache.fluss.rpc.messages.GetLatestLakeSnapshotRequest; +import org.apache.fluss.rpc.messages.GetProducerOffsetsRequest; import org.apache.fluss.rpc.messages.GetTableInfoRequest; import org.apache.fluss.rpc.messages.GetTableSchemaRequest; import org.apache.fluss.rpc.messages.ListAclsRequest; @@ -78,9 +80,13 @@ import org.apache.fluss.rpc.messages.PbAlterConfig; import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket; import org.apache.fluss.rpc.messages.PbPartitionSpec; +import org.apache.fluss.rpc.messages.PbProducerTableOffsets; +import org.apache.fluss.rpc.messages.PbTableBucketOffset; import org.apache.fluss.rpc.messages.PbTablePath; import org.apache.fluss.rpc.messages.RebalanceRequest; import org.apache.fluss.rpc.messages.RebalanceResponse; +import org.apache.fluss.rpc.messages.RegisterProducerOffsetsRequest; +import org.apache.fluss.rpc.messages.RegisterResult; import org.apache.fluss.rpc.messages.RemoveServerTagRequest; import org.apache.fluss.rpc.messages.TableExistsRequest; import org.apache.fluss.rpc.messages.TableExistsResponse; @@ -588,6 +594,83 @@ public CompletableFuture cancelRebalance(@Nullable String rebalanceId) { return gateway.cancelRebalance(request).thenApply(r -> null); } + // ================================================================================== + // Producer Offset Management APIs (for Exactly-Once Semantics) + // ================================================================================== + + @Override + public CompletableFuture registerProducerOffsets( + String producerId, Map offsets) { + checkNotNull(producerId, "producerId must not be null"); + checkNotNull(offsets, "offsets must not be null"); + + RegisterProducerOffsetsRequest request = new RegisterProducerOffsetsRequest(); + request.setProducerId(producerId); + + // Convert TableBucket offsets to PbTableBucketOffset + for (Map.Entry entry : offsets.entrySet()) { + TableBucket bucket = entry.getKey(); + PbTableBucketOffset pbOffset = + request.addBucketOffset() + .setTableId(bucket.getTableId()) + .setBucketId(bucket.getBucket()) + .setOffset(entry.getValue()); + if (bucket.getPartitionId() != null) { + pbOffset.setPartitionId(bucket.getPartitionId()); + } + } + + return gateway.registerProducerOffsets(request) + .thenApply( + response -> { + int code = + response.hasResult() + ? response.getResult() + : RegisterResult.CREATED.getCode(); + return RegisterResult.fromCode(code); + }); + } + + @Override + public CompletableFuture getProducerOffsets(String producerId) { + checkNotNull(producerId, "producerId must not be null"); + + GetProducerOffsetsRequest request = new GetProducerOffsetsRequest(); + request.setProducerId(producerId); + + return gateway.getProducerOffsets(request) + .thenApply( + response -> { + if (!response.hasProducerId()) { + return null; + } + + Map> tableOffsets = new HashMap<>(); + for (PbProducerTableOffsets pbTableOffsets : + response.getTableOffsetsList()) { + long tableId = pbTableOffsets.getTableId(); + tableOffsets.put( + tableId, + ClientRpcMessageUtils.toTableBucketOffsets(pbTableOffsets)); + } + + long expirationTime = + response.hasExpirationTime() ? response.getExpirationTime() : 0; + return new ProducerOffsetsResult( + response.getProducerId(), tableOffsets, expirationTime); + }); + } + + @Override + public CompletableFuture deleteProducerOffsets(String producerId) { + checkNotNull(producerId, "producerId must not be null"); + + DeleteProducerOffsetsRequest request = new DeleteProducerOffsetsRequest(); + request.setProducerId(producerId); + + return gateway.deleteProducerOffsets(request).thenApply(r -> null); + } + @Override public void close() { // nothing to do yet diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/ProducerOffsetsResult.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/ProducerOffsetsResult.java new file mode 100644 index 0000000000..7f48eb9052 --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/ProducerOffsetsResult.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.admin; + +import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.metadata.TableBucket; + +import java.util.Collections; +import java.util.Map; + +/** + * Result containing producer offset snapshot data. + * + *

This class holds the offset snapshot for a producer, which is used for undo recovery when a + * Flink job fails over before completing its first checkpoint. + * + *

The snapshot contains bucket offsets organized by table ID, allowing the Flink Operator + * Coordinator to coordinate undo recovery across all subtasks. + * + * @since 0.9 + */ +@PublicEvolving +public class ProducerOffsetsResult { + + private final String producerId; + private final Map> tableOffsets; + private final long expirationTime; + + /** + * Creates a new ProducerOffsetsResult. + * + * @param producerId the producer ID (typically Flink job ID) + * @param tableOffsets map of table ID to bucket offsets + * @param expirationTime the expiration timestamp in milliseconds + */ + public ProducerOffsetsResult( + String producerId, + Map> tableOffsets, + long expirationTime) { + this.producerId = producerId; + this.tableOffsets = Collections.unmodifiableMap(tableOffsets); + this.expirationTime = expirationTime; + } + + /** + * Get the producer ID. + * + * @return the producer ID + */ + public String getProducerId() { + return producerId; + } + + /** + * Get the offset snapshot for all tables. + * + * @return unmodifiable map of table ID to bucket offsets + */ + public Map> getTableOffsets() { + return tableOffsets; + } + + /** + * Get the expiration timestamp. + * + * @return the expiration timestamp in milliseconds since epoch + */ + public long getExpirationTime() { + return expirationTime; + } + + @Override + public String toString() { + return "ProducerOffsetsResult{" + + "producerId='" + + producerId + + '\'' + + ", tableCount=" + + tableOffsets.size() + + ", expirationTime=" + + expirationTime + + '}'; + } +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java index 2fae90d4b5..bba5cf141d 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java @@ -542,4 +542,50 @@ public static List toConfigEntries(List pbDescrib pbDescribeConfig.getConfigSource()))) .collect(Collectors.toList()); } + + /** + * Parses a PbTableOffsets into a map of TableBucket to offset. + * + * @param pbTableOffsets the protobuf table offsets + * @return map of TableBucket to offset + */ + public static Map toTableBucketOffsets( + org.apache.fluss.rpc.messages.PbTableOffsets pbTableOffsets) { + Map bucketOffsets = new HashMap<>(); + long tableId = pbTableOffsets.getTableId(); + + for (org.apache.fluss.rpc.messages.PbBucketOffset pbBucketOffset : + pbTableOffsets.getBucketOffsetsList()) { + Long partitionId = + pbBucketOffset.hasPartitionId() ? pbBucketOffset.getPartitionId() : null; + TableBucket bucket = + new TableBucket(tableId, partitionId, pbBucketOffset.getBucketId()); + bucketOffsets.put(bucket, pbBucketOffset.getLogEndOffset()); + } + + return bucketOffsets; + } + + /** + * Parses a PbProducerTableOffsets into a map of TableBucket to offset. + * + * @param pbTableOffsets the protobuf producer table offsets + * @return map of TableBucket to offset + */ + public static Map toTableBucketOffsets( + org.apache.fluss.rpc.messages.PbProducerTableOffsets pbTableOffsets) { + Map bucketOffsets = new HashMap<>(); + long tableId = pbTableOffsets.getTableId(); + + for (org.apache.fluss.rpc.messages.PbBucketOffset pbBucketOffset : + pbTableOffsets.getBucketOffsetsList()) { + Long partitionId = + pbBucketOffset.hasPartitionId() ? pbBucketOffset.getPartitionId() : null; + TableBucket bucket = + new TableBucket(tableId, partitionId, pbBucketOffset.getBucketId()); + bucketOffsets.put(bucket, pbBucketOffset.getLogEndOffset()); + } + + return bucketOffsets; + } } diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/ProducerOffsetsITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/ProducerOffsetsITCase.java new file mode 100644 index 0000000000..f870050a79 --- /dev/null +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/ProducerOffsetsITCase.java @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.admin; + +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.rpc.messages.RegisterResult; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Integration tests for Producer Offset Snapshot APIs. + * + *

These tests verify the end-to-end functionality of: + * + *

    + *
  • {@link Admin#registerProducerOffsets(String, Map)} + *
  • {@link Admin#getProducerOffsets(String)} + *
  • {@link Admin#deleteProducerOffsets(String)} + *
+ */ +class ProducerOffsetsITCase extends ClientToServerITCaseBase { + + // ------------------------------------------------------------------------ + // Basic CRUD Tests + // ------------------------------------------------------------------------ + + @Test + void testRegisterAndGetProducerOffsets() throws Exception { + String producerId = "test-producer-" + System.currentTimeMillis(); + Map offsets = createTestOffsets(); + + // Register offsets + RegisterResult result = admin.registerProducerOffsets(producerId, offsets).get(); + assertThat(result).isEqualTo(RegisterResult.CREATED); + + // Get offsets + ProducerOffsetsResult offsetsResult = admin.getProducerOffsets(producerId).get(); + assertThat(offsetsResult).isNotNull(); + assertThat(offsetsResult.getProducerId()).isEqualTo(producerId); + assertThat(offsetsResult.getExpirationTime()).isGreaterThan(System.currentTimeMillis()); + + // Verify offsets content + Map> tableOffsets = offsetsResult.getTableOffsets(); + assertThat(tableOffsets).isNotEmpty(); + + // Flatten and verify all offsets are present + Map allOffsets = flattenTableOffsets(tableOffsets); + assertThat(allOffsets).hasSize(offsets.size()); + for (Map.Entry entry : offsets.entrySet()) { + assertThat(allOffsets).containsEntry(entry.getKey(), entry.getValue()); + } + + // Cleanup + admin.deleteProducerOffsets(producerId).get(); + } + + @Test + void testRegisterProducerOffsetsAlreadyExists() throws Exception { + String producerId = "test-producer-exists-" + System.currentTimeMillis(); + Map offsets1 = new HashMap<>(); + offsets1.put(new TableBucket(1L, 0), 100L); + + Map offsets2 = new HashMap<>(); + offsets2.put(new TableBucket(2L, 0), 200L); + + try { + // First registration should succeed with CREATED + RegisterResult result1 = admin.registerProducerOffsets(producerId, offsets1).get(); + assertThat(result1).isEqualTo(RegisterResult.CREATED); + + // Second registration should return ALREADY_EXISTS + RegisterResult result2 = admin.registerProducerOffsets(producerId, offsets2).get(); + assertThat(result2).isEqualTo(RegisterResult.ALREADY_EXISTS); + + // Verify original offsets are preserved + ProducerOffsetsResult offsetsResult = admin.getProducerOffsets(producerId).get(); + Map allOffsets = + flattenTableOffsets(offsetsResult.getTableOffsets()); + assertThat(allOffsets).containsKey(new TableBucket(1L, 0)); + assertThat(allOffsets).doesNotContainKey(new TableBucket(2L, 0)); + } finally { + admin.deleteProducerOffsets(producerId).get(); + } + } + + @Test + void testGetProducerOffsetsNotFound() throws Exception { + String producerId = "non-existent-producer-" + System.currentTimeMillis(); + + ProducerOffsetsResult result = admin.getProducerOffsets(producerId).get(); + assertThat(result).isNull(); + } + + @Test + void testDeleteProducerOffsets() throws Exception { + String producerId = "test-producer-delete-" + System.currentTimeMillis(); + Map offsets = createTestOffsets(); + + // Register offsets + admin.registerProducerOffsets(producerId, offsets).get(); + assertThat(admin.getProducerOffsets(producerId).get()).isNotNull(); + + // Delete offsets + admin.deleteProducerOffsets(producerId).get(); + + // Verify deleted + assertThat(admin.getProducerOffsets(producerId).get()).isNull(); + } + + @Test + void testDeleteNonExistentProducerOffsets() throws Exception { + String producerId = "non-existent-producer-delete-" + System.currentTimeMillis(); + + // Should not throw exception + admin.deleteProducerOffsets(producerId).get(); + } + + // ------------------------------------------------------------------------ + // Partitioned Table Tests + // ------------------------------------------------------------------------ + + @Test + void testRegisterOffsetsWithPartitionedTable() throws Exception { + String producerId = "test-producer-partitioned-" + System.currentTimeMillis(); + Map offsets = new HashMap<>(); + + // Non-partitioned table + offsets.put(new TableBucket(1L, 0), 100L); + offsets.put(new TableBucket(1L, 1), 101L); + + // Partitioned table with different partitions + offsets.put(new TableBucket(2L, 10L, 0), 200L); + offsets.put(new TableBucket(2L, 10L, 1), 201L); + offsets.put(new TableBucket(2L, 20L, 0), 300L); + offsets.put(new TableBucket(2L, 20L, 1), 301L); + + try { + // Register + RegisterResult result = admin.registerProducerOffsets(producerId, offsets).get(); + assertThat(result).isEqualTo(RegisterResult.CREATED); + + // Get and verify + ProducerOffsetsResult offsetsResult = admin.getProducerOffsets(producerId).get(); + assertThat(offsetsResult).isNotNull(); + + Map allOffsets = + flattenTableOffsets(offsetsResult.getTableOffsets()); + assertThat(allOffsets).hasSize(6); + + // Verify non-partitioned table offsets + assertThat(allOffsets.get(new TableBucket(1L, 0))).isEqualTo(100L); + assertThat(allOffsets.get(new TableBucket(1L, 1))).isEqualTo(101L); + + // Verify partitioned table offsets + assertThat(allOffsets.get(new TableBucket(2L, 10L, 0))).isEqualTo(200L); + assertThat(allOffsets.get(new TableBucket(2L, 10L, 1))).isEqualTo(201L); + assertThat(allOffsets.get(new TableBucket(2L, 20L, 0))).isEqualTo(300L); + assertThat(allOffsets.get(new TableBucket(2L, 20L, 1))).isEqualTo(301L); + } finally { + admin.deleteProducerOffsets(producerId).get(); + } + } + + // ------------------------------------------------------------------------ + // Concurrent Registration Tests + // ------------------------------------------------------------------------ + + @Test + void testConcurrentRegistrationSameProducer() throws Exception { + String producerId = "test-concurrent-same-" + System.currentTimeMillis(); + int numThreads = 10; + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(numThreads); + AtomicInteger createdCount = new AtomicInteger(0); + AtomicInteger existsCount = new AtomicInteger(0); + + try { + for (int i = 0; i < numThreads; i++) { + final int threadId = i; + CompletableFuture.runAsync( + () -> { + try { + startLatch.await(); + Map offsets = new HashMap<>(); + offsets.put(new TableBucket(1L, threadId), (long) threadId); + + RegisterResult result = + admin.registerProducerOffsets(producerId, offsets).get(); + if (result == RegisterResult.CREATED) { + createdCount.incrementAndGet(); + } else { + existsCount.incrementAndGet(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + doneLatch.countDown(); + } + }, + executor); + } + + // Start all threads simultaneously + startLatch.countDown(); + assertThat(doneLatch.await(30, TimeUnit.SECONDS)).isTrue(); + + // Exactly one should succeed with CREATED + assertThat(createdCount.get()).isEqualTo(1); + assertThat(existsCount.get()).isEqualTo(numThreads - 1); + + // Verify snapshot exists + assertThat(admin.getProducerOffsets(producerId).get()).isNotNull(); + } finally { + executor.shutdown(); + admin.deleteProducerOffsets(producerId).get(); + } + } + + @Test + void testConcurrentRegistrationDifferentProducers() throws Exception { + String prefix = "test-concurrent-diff-" + System.currentTimeMillis() + "-"; + int numProducers = 10; + ExecutorService executor = Executors.newFixedThreadPool(numProducers); + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(numProducers); + AtomicInteger successCount = new AtomicInteger(0); + List producerIds = new ArrayList<>(); + + try { + for (int i = 0; i < numProducers; i++) { + final int producerNum = i; + final String producerId = prefix + producerNum; + producerIds.add(producerId); + + CompletableFuture.runAsync( + () -> { + try { + startLatch.await(); + Map offsets = new HashMap<>(); + offsets.put(new TableBucket(1L, producerNum), (long) producerNum); + + RegisterResult result = + admin.registerProducerOffsets(producerId, offsets).get(); + if (result == RegisterResult.CREATED) { + successCount.incrementAndGet(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + doneLatch.countDown(); + } + }, + executor); + } + + // Start all threads simultaneously + startLatch.countDown(); + assertThat(doneLatch.await(30, TimeUnit.SECONDS)).isTrue(); + + // All should succeed since they're different producers + assertThat(successCount.get()).isEqualTo(numProducers); + + // Verify all snapshots exist + for (String producerId : producerIds) { + assertThat(admin.getProducerOffsets(producerId).get()).isNotNull(); + } + } finally { + executor.shutdown(); + // Cleanup all producers + for (String producerId : producerIds) { + admin.deleteProducerOffsets(producerId).get(); + } + } + } + + // ------------------------------------------------------------------------ + // Edge Cases + // ------------------------------------------------------------------------ + + @Test + void testRegisterEmptyOffsets() throws Exception { + String producerId = "test-producer-empty-" + System.currentTimeMillis(); + Map offsets = new HashMap<>(); + + try { + RegisterResult result = admin.registerProducerOffsets(producerId, offsets).get(); + assertThat(result).isEqualTo(RegisterResult.CREATED); + + ProducerOffsetsResult offsetsResult = admin.getProducerOffsets(producerId).get(); + assertThat(offsetsResult).isNotNull(); + assertThat(offsetsResult.getTableOffsets()).isEmpty(); + } finally { + admin.deleteProducerOffsets(producerId).get(); + } + } + + @Test + void testRegisterLargeNumberOfOffsets() throws Exception { + String producerId = "test-producer-large-" + System.currentTimeMillis(); + Map offsets = new HashMap<>(); + + // Create offsets for 100 tables with 10 buckets each + for (long tableId = 1; tableId <= 100; tableId++) { + for (int bucket = 0; bucket < 10; bucket++) { + offsets.put(new TableBucket(tableId, bucket), tableId * 1000 + bucket); + } + } + + try { + RegisterResult result = admin.registerProducerOffsets(producerId, offsets).get(); + assertThat(result).isEqualTo(RegisterResult.CREATED); + + ProducerOffsetsResult offsetsResult = admin.getProducerOffsets(producerId).get(); + assertThat(offsetsResult).isNotNull(); + + Map allOffsets = + flattenTableOffsets(offsetsResult.getTableOffsets()); + assertThat(allOffsets).hasSize(1000); + + // Verify some random offsets + assertThat(allOffsets.get(new TableBucket(1L, 0))).isEqualTo(1000L); + assertThat(allOffsets.get(new TableBucket(50L, 5))).isEqualTo(50005L); + assertThat(allOffsets.get(new TableBucket(100L, 9))).isEqualTo(100009L); + } finally { + admin.deleteProducerOffsets(producerId).get(); + } + } + + // ------------------------------------------------------------------------ + // Helper Methods + // ------------------------------------------------------------------------ + + private Map createTestOffsets() { + Map offsets = new HashMap<>(); + offsets.put(new TableBucket(1L, 0), 100L); + offsets.put(new TableBucket(1L, 1), 200L); + offsets.put(new TableBucket(2L, 0), 300L); + return offsets; + } + + private Map flattenTableOffsets( + Map> tableOffsets) { + Map result = new HashMap<>(); + for (Map bucketOffsets : tableOffsets.values()) { + result.putAll(bucketOffsets); + } + return result; + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 7e04c3a1cc..3a341136eb 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -354,6 +354,31 @@ public class ConfigOptions { + "The default value is 10. " + "This option is deprecated. Please use server.io-pool.size instead."); + /** + * The TTL (time-to-live) for producer offset snapshots. Snapshots older than this TTL will be + * automatically cleaned up by the coordinator server. + */ + public static final ConfigOption COORDINATOR_PRODUCER_SNAPSHOT_TTL = + key("coordinator.producer-snapshot.ttl") + .durationType() + .defaultValue(Duration.ofHours(24)) + .withDescription( + "The TTL (time-to-live) for producer offset snapshots. " + + "Snapshots older than this TTL will be automatically cleaned up " + + "by the coordinator server. Default is 24 hours."); + + /** + * The interval for cleaning up expired producer offset snapshots and orphan files in remote + * storage. + */ + public static final ConfigOption COORDINATOR_PRODUCER_SNAPSHOT_CLEANUP_INTERVAL = + key("coordinator.producer-snapshot.cleanup-interval") + .durationType() + .defaultValue(Duration.ofHours(1)) + .withDescription( + "The interval for cleaning up expired producer offset snapshots " + + "and orphan files in remote storage. Default is 1 hour."); + // ------------------------------------------------------------------------ // ConfigOptions for Tablet Server // ------------------------------------------------------------------------ diff --git a/fluss-common/src/main/java/org/apache/fluss/rpc/messages/RegisterResult.java b/fluss-common/src/main/java/org/apache/fluss/rpc/messages/RegisterResult.java new file mode 100644 index 0000000000..c8aeed9d9e --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/rpc/messages/RegisterResult.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.rpc.messages; + +import org.apache.fluss.annotation.PublicEvolving; + +/** + * Result of producer offset registration. + * + *

This enum indicates whether a producer offset snapshot was newly created or already existed + * when calling registerProducerOffsets API. + * + * @since 0.9 + */ +@PublicEvolving +public enum RegisterResult { + /** + * Snapshot was newly created. + * + *

This indicates a first startup scenario where no previous snapshot existed. The caller + * does not need to perform undo recovery. + */ + CREATED(0), + + /** + * Snapshot already existed and was not overwritten. + * + *

This indicates a failover scenario where a previous snapshot exists. The caller should + * perform undo recovery using the existing snapshot offsets. + */ + ALREADY_EXISTS(1); + + /** The code used in RPC messages. */ + private final int code; + + RegisterResult(int code) { + this.code = code; + } + + /** Returns the code used in RPC messages. */ + public int getCode() { + return code; + } + + /** Returns the RegisterResult for the given code. */ + public static RegisterResult fromCode(int code) { + for (RegisterResult result : values()) { + if (result.code == code) { + return result; + } + } + throw new IllegalArgumentException("Unknown RegisterResult code: " + code); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/RetryUtils.java b/fluss-common/src/main/java/org/apache/fluss/utils/RetryUtils.java new file mode 100644 index 0000000000..dba1422a90 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/utils/RetryUtils.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.utils; + +import org.apache.fluss.utils.function.ThrowingSupplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.function.Predicate; + +/** + * Utility class for executing operations with retry logic. + * + *

This class provides methods to execute operations that may fail transiently, with configurable + * retry count and exponential backoff. + */ +public final class RetryUtils { + + private static final Logger LOG = LoggerFactory.getLogger(RetryUtils.class); + + private RetryUtils() {} + + /** + * Executes an operation with retry logic using exponential backoff. + * + * @param operation the operation to execute + * @param operationName a descriptive name for logging + * @param maxRetries maximum number of retry attempts + * @param initialBackoffMs initial backoff delay in milliseconds + * @param maxBackoffMs maximum backoff delay in milliseconds + * @param retryPredicate predicate to determine if an exception should trigger a retry + * @param the return type of the operation + * @return the result of the operation + * @throws IOException if all retries are exhausted or a non-retryable exception occurs + * @throws InterruptedIOException if the thread is interrupted during retry backoff + */ + public static T executeWithRetry( + ThrowingSupplier operation, + String operationName, + int maxRetries, + long initialBackoffMs, + long maxBackoffMs, + Predicate retryPredicate) + throws IOException { + + Exception lastException = null; + long backoffMs = initialBackoffMs; + + for (int attempt = 1; attempt <= maxRetries; attempt++) { + try { + return operation.get(); + } catch (Exception e) { + // Check if the exception is an InterruptedException or wraps one + if (isInterruptedException(e)) { + Thread.currentThread().interrupt(); + throw new InterruptedIOException( + operationName + " was interrupted on attempt " + attempt); + } + + lastException = e; + + if (!retryPredicate.test(e)) { + // Non-retryable exception + throw wrapAsIOException(e, operationName); + } + + if (attempt < maxRetries) { + LOG.warn( + "{} failed (attempt {}/{}), retrying in {} ms", + operationName, + attempt, + maxRetries, + backoffMs, + e); + try { + Thread.sleep(backoffMs); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new InterruptedIOException( + operationName + + " was interrupted while waiting for retry after attempt " + + attempt); + } + backoffMs = Math.min(backoffMs * 2, maxBackoffMs); + } + } + } + + throw new IOException( + String.format("%s failed after %d attempts", operationName, maxRetries), + lastException); + } + + /** + * Executes an IO operation with retry logic. Retries on any IOException. + * + * @param operation the operation to execute + * @param operationName a descriptive name for logging + * @param maxRetries maximum number of retry attempts + * @param initialBackoffMs initial backoff delay in milliseconds + * @param maxBackoffMs maximum backoff delay in milliseconds + * @param the return type of the operation + * @return the result of the operation + * @throws IOException if all retries are exhausted or a non-retryable exception occurs + * @throws InterruptedIOException if the thread is interrupted during retry backoff + */ + public static T executeIOWithRetry( + ThrowingSupplier operation, + String operationName, + int maxRetries, + long initialBackoffMs, + long maxBackoffMs) + throws IOException { + return executeWithRetry( + operation, + operationName, + maxRetries, + initialBackoffMs, + maxBackoffMs, + e -> e instanceof IOException); + } + + /** + * Checks if the exception is an InterruptedException or wraps one. + * + * @param e the exception to check + * @return true if the exception is or wraps an InterruptedException + */ + private static boolean isInterruptedException(Exception e) { + Throwable current = e; + while (current != null) { + if (current instanceof InterruptedException) { + return true; + } + current = current.getCause(); + } + return false; + } + + private static IOException wrapAsIOException(Exception e, String operationName) { + if (e instanceof IOException) { + return (IOException) e; + } + return new IOException(operationName + " failed", e); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/function/ThrowingSupplier.java b/fluss-common/src/main/java/org/apache/fluss/utils/function/ThrowingSupplier.java new file mode 100644 index 0000000000..e1dacc5842 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/utils/function/ThrowingSupplier.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.utils.function; + +/** + * A functional interface that supplies a result and may throw an exception. + * + * @param the type of results supplied by this supplier + * @param the type of exception that may be thrown + */ +@FunctionalInterface +public interface ThrowingSupplier { + + /** + * Gets a result. + * + * @return a result + * @throws E if unable to supply a result + */ + T get() throws E; +} diff --git a/fluss-common/src/test/java/org/apache/fluss/utils/RetryUtilsTest.java b/fluss-common/src/test/java/org/apache/fluss/utils/RetryUtilsTest.java new file mode 100644 index 0000000000..e04745a6bb --- /dev/null +++ b/fluss-common/src/test/java/org/apache/fluss/utils/RetryUtilsTest.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.utils; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Unit tests for {@link RetryUtils}. */ +class RetryUtilsTest { + + @Test + void testSuccessOnFirstAttempt() throws IOException { + AtomicInteger attempts = new AtomicInteger(0); + + String result = + RetryUtils.executeWithRetry( + () -> { + attempts.incrementAndGet(); + return "success"; + }, + "testOp", + 3, + 10, + 100, + e -> true); + + assertThat(result).isEqualTo("success"); + assertThat(attempts.get()).isEqualTo(1); + } + + @Test + void testSuccessAfterRetries() throws IOException { + AtomicInteger attempts = new AtomicInteger(0); + + String result = + RetryUtils.executeWithRetry( + () -> { + int attempt = attempts.incrementAndGet(); + if (attempt < 3) { + throw new IOException("transient failure"); + } + return "success"; + }, + "testOp", + 5, + 10, + 100, + e -> e instanceof IOException); + + assertThat(result).isEqualTo("success"); + assertThat(attempts.get()).isEqualTo(3); + } + + @Test + void testExhaustedRetries() { + AtomicInteger attempts = new AtomicInteger(0); + + assertThatThrownBy( + () -> + RetryUtils.executeWithRetry( + () -> { + attempts.incrementAndGet(); + throw new IOException("persistent failure"); + }, + "testOp", + 3, + 10, + 100, + e -> e instanceof IOException)) + .isInstanceOf(IOException.class) + .hasMessageContaining("testOp failed after 3 attempts") + .hasCauseInstanceOf(IOException.class); + + assertThat(attempts.get()).isEqualTo(3); + } + + @Test + void testNonRetryableException() { + AtomicInteger attempts = new AtomicInteger(0); + + assertThatThrownBy( + () -> + RetryUtils.executeWithRetry( + () -> { + attempts.incrementAndGet(); + throw new IllegalArgumentException("bad argument"); + }, + "testOp", + 5, + 10, + 100, + e -> e instanceof IOException)) + .isInstanceOf(IOException.class) + .hasMessageContaining("testOp failed") + .hasCauseInstanceOf(IllegalArgumentException.class); + + // Should fail immediately without retries + assertThat(attempts.get()).isEqualTo(1); + } + + @Test + void testIOExceptionPassedThrough() { + IOException originalException = new IOException("original"); + + assertThatThrownBy( + () -> + RetryUtils.executeWithRetry( + () -> { + throw originalException; + }, + "testOp", + 1, + 10, + 100, + e -> false)) + .isInstanceOf(IOException.class) + .isSameAs(originalException); + } + + @Test + void testExecuteIOWithRetry() throws IOException { + AtomicInteger attempts = new AtomicInteger(0); + + String result = + RetryUtils.executeIOWithRetry( + () -> { + int attempt = attempts.incrementAndGet(); + if (attempt < 2) { + throw new IOException("transient"); + } + return "done"; + }, + "ioOp", + 3, + 10, + 100); + + assertThat(result).isEqualTo("done"); + assertThat(attempts.get()).isEqualTo(2); + } + + @Test + void testExecuteIOWithRetryNonIOException() { + AtomicInteger attempts = new AtomicInteger(0); + + assertThatThrownBy( + () -> + RetryUtils.executeIOWithRetry( + () -> { + attempts.incrementAndGet(); + throw new RuntimeException("not IO"); + }, + "ioOp", + 3, + 10, + 100)) + .isInstanceOf(IOException.class) + .hasCauseInstanceOf(RuntimeException.class); + + // RuntimeException is not retryable for executeIOWithRetry + assertThat(attempts.get()).isEqualTo(1); + } + + @Test + void testBackoffCapped() throws IOException { + AtomicInteger attempts = new AtomicInteger(0); + long startTime = System.currentTimeMillis(); + + RetryUtils.executeWithRetry( + () -> { + int attempt = attempts.incrementAndGet(); + if (attempt < 4) { + throw new IOException("fail"); + } + return "ok"; + }, + "backoffTest", + 5, + 10, // initial backoff + 50, // max backoff (caps exponential growth) + e -> e instanceof IOException); + + long elapsed = System.currentTimeMillis() - startTime; + // 3 retries: 10ms + 20ms + 40ms (capped to 50ms) = 80ms minimum + // But with cap: 10ms + 20ms + 50ms = 80ms minimum + // Allow some tolerance for test execution + assertThat(elapsed).isGreaterThanOrEqualTo(70); + assertThat(attempts.get()).isEqualTo(4); + } + + @Test + void testInterruptedExceptionFromOperation() { + AtomicInteger attempts = new AtomicInteger(0); + + assertThatThrownBy( + () -> + RetryUtils.executeWithRetry( + () -> { + attempts.incrementAndGet(); + throw new InterruptedException("interrupted"); + }, + "interruptOp", + 5, + 10, + 100, + e -> true)) + .isInstanceOf(InterruptedIOException.class) + .hasMessageContaining("interruptOp was interrupted on attempt 1"); + + // Should fail immediately without retries + assertThat(attempts.get()).isEqualTo(1); + // Verify interrupt status is preserved + assertThat(Thread.currentThread().isInterrupted()).isTrue(); + // Clear interrupt status for other tests + Thread.interrupted(); + } + + @Test + void testInterruptedDuringBackoff() throws Exception { + AtomicInteger attempts = new AtomicInteger(0); + Thread testThread = Thread.currentThread(); + + // Schedule an interrupt during the backoff sleep + Thread interrupter = + new Thread( + () -> { + try { + Thread.sleep(50); // Wait for first attempt to fail + testThread.interrupt(); + } catch (InterruptedException e) { + // ignore + } + }); + interrupter.start(); + + assertThatThrownBy( + () -> + RetryUtils.executeWithRetry( + () -> { + attempts.incrementAndGet(); + throw new IOException("fail"); + }, + "backoffInterruptOp", + 5, + 200, // Long backoff to ensure interrupt happens during + // sleep + 1000, + e -> e instanceof IOException)) + .isInstanceOf(InterruptedIOException.class) + .hasMessageContaining("backoffInterruptOp was interrupted while waiting for retry"); + + interrupter.join(); + // Verify interrupt status is preserved + assertThat(Thread.currentThread().isInterrupted()).isTrue(); + // Clear interrupt status for other tests + Thread.interrupted(); + } +} diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java index 072a5954f1..3885bb387c 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java @@ -33,6 +33,8 @@ import org.apache.fluss.rpc.messages.CreatePartitionResponse; import org.apache.fluss.rpc.messages.CreateTableRequest; import org.apache.fluss.rpc.messages.CreateTableResponse; +import org.apache.fluss.rpc.messages.DeleteProducerOffsetsRequest; +import org.apache.fluss.rpc.messages.DeleteProducerOffsetsResponse; import org.apache.fluss.rpc.messages.DropAclsRequest; import org.apache.fluss.rpc.messages.DropAclsResponse; import org.apache.fluss.rpc.messages.DropDatabaseRequest; @@ -41,10 +43,14 @@ import org.apache.fluss.rpc.messages.DropPartitionResponse; import org.apache.fluss.rpc.messages.DropTableRequest; import org.apache.fluss.rpc.messages.DropTableResponse; +import org.apache.fluss.rpc.messages.GetProducerOffsetsRequest; +import org.apache.fluss.rpc.messages.GetProducerOffsetsResponse; import org.apache.fluss.rpc.messages.ListRebalanceProgressRequest; import org.apache.fluss.rpc.messages.ListRebalanceProgressResponse; import org.apache.fluss.rpc.messages.RebalanceRequest; import org.apache.fluss.rpc.messages.RebalanceResponse; +import org.apache.fluss.rpc.messages.RegisterProducerOffsetsRequest; +import org.apache.fluss.rpc.messages.RegisterProducerOffsetsResponse; import org.apache.fluss.rpc.messages.RemoveServerTagRequest; import org.apache.fluss.rpc.messages.RemoveServerTagResponse; import org.apache.fluss.rpc.protocol.ApiKeys; @@ -146,6 +152,40 @@ CompletableFuture listRebalanceProgress( @RPC(api = ApiKeys.CANCEL_REBALANCE) CompletableFuture cancelRebalance(CancelRebalanceRequest request); + // ================================================================================== + // Producer Offset Management APIs (for Exactly-Once Semantics) + // ================================================================================== + + /** + * Register producer offset snapshot with atomic "check and register" semantics. + * + * @param request the request containing producer ID and offsets + * @return response indicating whether snapshot was created or already existed + */ + @RPC(api = ApiKeys.REGISTER_PRODUCER_OFFSETS) + CompletableFuture registerProducerOffsets( + RegisterProducerOffsetsRequest request); + + /** + * Get producer offset snapshot. + * + * @param request the request containing producer ID + * @return response containing the producer offsets + */ + @RPC(api = ApiKeys.GET_PRODUCER_OFFSETS) + CompletableFuture getProducerOffsets( + GetProducerOffsetsRequest request); + + /** + * Delete producer offset snapshot. + * + * @param request the request containing producer ID + * @return response indicating deletion success + */ + @RPC(api = ApiKeys.DELETE_PRODUCER_OFFSETS) + CompletableFuture deleteProducerOffsets( + DeleteProducerOffsetsRequest request); + // todo: rename table & alter table } diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java index cc033ba8a9..f962d346cd 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java @@ -80,7 +80,10 @@ public enum ApiKeys { REBALANCE(1049, 0, 0, PUBLIC), LIST_REBALANCE_PROGRESS(1050, 0, 0, PUBLIC), CANCEL_REBALANCE(1051, 0, 0, PUBLIC), - PREPARE_LAKE_TABLE_SNAPSHOT(1052, 0, 0, PRIVATE); + PREPARE_LAKE_TABLE_SNAPSHOT(1052, 0, 0, PRIVATE), + REGISTER_PRODUCER_OFFSETS(1053, 0, 0, PUBLIC), + GET_PRODUCER_OFFSETS(1054, 0, 0, PUBLIC), + DELETE_PRODUCER_OFFSETS(1055, 0, 0, PUBLIC); private static final Map ID_TO_TYPE = Arrays.stream(ApiKeys.values()) diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index db9d614354..d620b6e3c8 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -1062,4 +1062,53 @@ message PbBucketOffset { optional int64 partition_id = 1; required int32 bucket_id = 2; optional int64 log_end_offset = 4; +} + +// ------------------------------------------------------------------------------------------ +// Producer Offset Snapshot Management +// ------------------------------------------------------------------------------------------ + +// Register producer offset snapshot request and response +message RegisterProducerOffsetsRequest { + required string producer_id = 1; + repeated PbTableBucketOffset bucket_offsets = 2; + optional int64 ttl_ms = 3; +} + +message RegisterProducerOffsetsResponse { + // Result of registration: 0 = CREATED (new snapshot), 1 = ALREADY_EXISTS (snapshot existed) + optional int32 result = 1; +} + +// Get producer offset snapshot request and response +message GetProducerOffsetsRequest { + required string producer_id = 1; +} + +message GetProducerOffsetsResponse { + optional string producer_id = 1; + optional int64 expiration_time = 2; + repeated PbProducerTableOffsets table_offsets = 3; +} + +// Delete producer offset snapshot request and response +message DeleteProducerOffsetsRequest { + required string producer_id = 1; +} + +message DeleteProducerOffsetsResponse { +} + +// Helper message for table offsets in producer snapshot (without table_path) +message PbProducerTableOffsets { + required int64 table_id = 1; + repeated PbBucketOffset bucket_offsets = 2; +} + +// Helper message for bucket offsets in producer snapshot +message PbTableBucketOffset { + required int64 table_id = 1; + optional int64 partition_id = 2; + required int32 bucket_id = 3; + required int64 offset = 4; } \ No newline at end of file diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index 94d56dd338..9a42f45e0b 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -45,6 +45,7 @@ import org.apache.fluss.metadata.MergeEngineType; import org.apache.fluss.metadata.PartitionSpec; import org.apache.fluss.metadata.ResolvedPartitionSpec; +import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; @@ -75,6 +76,8 @@ import org.apache.fluss.rpc.messages.CreatePartitionResponse; import org.apache.fluss.rpc.messages.CreateTableRequest; import org.apache.fluss.rpc.messages.CreateTableResponse; +import org.apache.fluss.rpc.messages.DeleteProducerOffsetsRequest; +import org.apache.fluss.rpc.messages.DeleteProducerOffsetsResponse; import org.apache.fluss.rpc.messages.DropAclsRequest; import org.apache.fluss.rpc.messages.DropAclsResponse; import org.apache.fluss.rpc.messages.DropDatabaseRequest; @@ -83,6 +86,8 @@ import org.apache.fluss.rpc.messages.DropPartitionResponse; import org.apache.fluss.rpc.messages.DropTableRequest; import org.apache.fluss.rpc.messages.DropTableResponse; +import org.apache.fluss.rpc.messages.GetProducerOffsetsRequest; +import org.apache.fluss.rpc.messages.GetProducerOffsetsResponse; import org.apache.fluss.rpc.messages.LakeTieringHeartbeatRequest; import org.apache.fluss.rpc.messages.LakeTieringHeartbeatResponse; import org.apache.fluss.rpc.messages.ListRebalanceProgressRequest; @@ -93,11 +98,15 @@ import org.apache.fluss.rpc.messages.PbHeartbeatReqForTable; import org.apache.fluss.rpc.messages.PbHeartbeatRespForTable; import org.apache.fluss.rpc.messages.PbPrepareLakeTableRespForTable; +import org.apache.fluss.rpc.messages.PbTableBucketOffset; import org.apache.fluss.rpc.messages.PbTableOffsets; import org.apache.fluss.rpc.messages.PrepareLakeTableSnapshotRequest; import org.apache.fluss.rpc.messages.PrepareLakeTableSnapshotResponse; import org.apache.fluss.rpc.messages.RebalanceRequest; import org.apache.fluss.rpc.messages.RebalanceResponse; +import org.apache.fluss.rpc.messages.RegisterProducerOffsetsRequest; +import org.apache.fluss.rpc.messages.RegisterProducerOffsetsResponse; +import org.apache.fluss.rpc.messages.RegisterResult; import org.apache.fluss.rpc.messages.RemoveServerTagRequest; import org.apache.fluss.rpc.messages.RemoveServerTagResponse; import org.apache.fluss.rpc.netty.server.Session; @@ -125,6 +134,7 @@ import org.apache.fluss.server.coordinator.event.ListRebalanceProgressEvent; import org.apache.fluss.server.coordinator.event.RebalanceEvent; import org.apache.fluss.server.coordinator.event.RemoveServerTagEvent; +import org.apache.fluss.server.coordinator.producer.ProducerSnapshotManager; import org.apache.fluss.server.coordinator.rebalance.goal.Goal; import org.apache.fluss.server.entity.CommitKvSnapshotData; import org.apache.fluss.server.entity.LakeTieringTableInfo; @@ -140,10 +150,14 @@ import org.apache.fluss.server.zk.data.TableRegistration; import org.apache.fluss.server.zk.data.lake.LakeTable; import org.apache.fluss.server.zk.data.lake.LakeTableHelper; +import org.apache.fluss.server.zk.data.producer.ProducerSnapshot; import org.apache.fluss.utils.IOUtils; import org.apache.fluss.utils.concurrent.FutureUtils; import org.apache.fluss.utils.json.TableBucketOffsets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import java.io.UncheckedIOException; @@ -162,11 +176,13 @@ import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindingFilters; import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings; import static org.apache.fluss.server.coordinator.rebalance.goal.GoalUtils.getGoalByType; +import static org.apache.fluss.server.utils.ServerRpcMessageUtils.addTableOffsetsToResponse; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.fromTablePath; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getAdjustIsrData; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getCommitLakeTableSnapshotData; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getCommitRemoteLogManifestData; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getPartitionSpec; +import static org.apache.fluss.server.utils.ServerRpcMessageUtils.groupOffsetsByTableId; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeCreateAclsResponse; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeDropAclsResponse; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toAlterTableConfigChanges; @@ -179,6 +195,9 @@ /** An RPC Gateway service for coordinator server. */ public final class CoordinatorService extends RpcServiceBase implements CoordinatorGateway { + + private static final Logger LOG = LoggerFactory.getLogger(CoordinatorService.class); + private final int defaultBucketNumber; private final int defaultReplicationFactor; private final boolean logTableAllowCreation; @@ -191,6 +210,7 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina private final LakeCatalogDynamicLoader lakeCatalogDynamicLoader; private final ExecutorService ioExecutor; private final LakeTableHelper lakeTableHelper; + private final ProducerSnapshotManager producerSnapshotManager; public CoordinatorService( Configuration conf, @@ -226,6 +246,10 @@ public CoordinatorService( this.ioExecutor = ioExecutor; this.lakeTableHelper = new LakeTableHelper(zkClient, conf.getString(ConfigOptions.REMOTE_DATA_DIR)); + + // Initialize and start the producer snapshot manager + this.producerSnapshotManager = new ProducerSnapshotManager(conf, zkClient); + this.producerSnapshotManager.start(); } @Override @@ -235,6 +259,7 @@ public String name() { @Override public void shutdown() { + IOUtils.closeQuietly(producerSnapshotManager, "producer snapshot manager"); IOUtils.closeQuietly(lakeCatalogDynamicLoader, "lake catalog"); } @@ -983,4 +1008,127 @@ public FlussPrincipal getFlussPrincipal() { return flussPrincipal; } } + + // ================================================================================== + // Producer Offset Management APIs (for Exactly-Once Semantics) + // ================================================================================== + + @Override + public CompletableFuture registerProducerOffsets( + RegisterProducerOffsetsRequest request) { + // Authorization: require WRITE permission on cluster + if (authorizer != null) { + authorizer.authorize(currentSession(), OperationType.WRITE, Resource.cluster()); + } + + return CompletableFuture.supplyAsync( + () -> { + try { + String producerId = request.getProducerId(); + Map offsets = new HashMap<>(); + + // Convert PbTableBucketOffset to TableBucket offsets + for (PbTableBucketOffset pbOffset : request.getBucketOffsetsList()) { + Long partitionId = + pbOffset.hasPartitionId() ? pbOffset.getPartitionId() : null; + TableBucket bucket = + new TableBucket( + pbOffset.getTableId(), + partitionId, + pbOffset.getBucketId()); + offsets.put(bucket, pbOffset.getOffset()); + } + + // Use custom TTL if provided, otherwise use default (null means use + // manager's default) + Long ttlMs = request.hasTtlMs() ? request.getTtlMs() : null; + + // Register with atomic "check and register" semantics + boolean created = + producerSnapshotManager.registerSnapshot( + producerId, offsets, ttlMs); + + RegisterProducerOffsetsResponse response = + new RegisterProducerOffsetsResponse(); + response.setResult( + created + ? RegisterResult.CREATED.getCode() + : RegisterResult.ALREADY_EXISTS.getCode()); + return response; + } catch (Exception e) { + throw new RuntimeException( + "Failed to register producer offsets for producer " + + request.getProducerId(), + e); + } + }, + ioExecutor); + } + + @Override + public CompletableFuture getProducerOffsets( + GetProducerOffsetsRequest request) { + // Authorization: require READ permission on cluster + if (authorizer != null) { + authorizer.authorize(currentSession(), OperationType.READ, Resource.cluster()); + } + + String producerId = request.getProducerId(); + + return CompletableFuture.supplyAsync( + () -> { + try { + Optional optSnapshot = + producerSnapshotManager.getSnapshotMetadata(producerId); + if (!optSnapshot.isPresent()) { + return new GetProducerOffsetsResponse(); + } + + ProducerSnapshot snapshot = optSnapshot.get(); + Map allOffsets = + producerSnapshotManager.getOffsets(producerId); + Map> offsetsByTable = + groupOffsetsByTableId(allOffsets); + + GetProducerOffsetsResponse response = new GetProducerOffsetsResponse(); + response.setProducerId(producerId); + response.setExpirationTime(snapshot.getExpirationTime()); + + for (Map.Entry> entry : + offsetsByTable.entrySet()) { + addTableOffsetsToResponse(response, entry.getKey(), entry.getValue()); + } + + return response; + } catch (Exception e) { + throw new RuntimeException( + "Failed to get producer offsets for producer " + producerId, e); + } + }, + ioExecutor); + } + + @Override + public CompletableFuture deleteProducerOffsets( + DeleteProducerOffsetsRequest request) { + // Authorization: require WRITE permission on cluster + if (authorizer != null) { + authorizer.authorize(currentSession(), OperationType.WRITE, Resource.cluster()); + } + + return CompletableFuture.supplyAsync( + () -> { + try { + String producerId = request.getProducerId(); + producerSnapshotManager.deleteSnapshot(producerId); + return new DeleteProducerOffsetsResponse(); + } catch (Exception e) { + throw new RuntimeException( + "Failed to delete producer offsets for producer " + + request.getProducerId(), + e); + } + }, + ioExecutor); + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/producer/ProducerSnapshotManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/producer/ProducerSnapshotManager.java new file mode 100644 index 0000000000..a6fc63fd09 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/producer/ProducerSnapshotManager.java @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.producer; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.fs.FileStatus; +import org.apache.fluss.fs.FileSystem; +import org.apache.fluss.fs.FsPath; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.producer.ProducerSnapshot; +import org.apache.fluss.utils.concurrent.ExecutorThreadFactory; +import org.apache.fluss.utils.types.Tuple2; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Manager for producer offset snapshots lifecycle. + * + *

This manager handles: + * + *

    + *
  • Registering new producer snapshots with atomic "check and register" semantics + *
  • Retrieving producer snapshot offsets + *
  • Deleting producer snapshots + *
  • Periodic cleanup of expired snapshots + *
  • Cleanup of orphan files in remote storage + *
+ * + *

The manager uses a background scheduler to periodically clean up expired snapshots and orphan + * files. The cleanup interval and snapshot TTL are configurable. + * + * @see ProducerSnapshotStore for low-level storage operations + */ +public class ProducerSnapshotManager implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(ProducerSnapshotManager.class); + + /** Maximum number of attempts for snapshot registration to avoid infinite loops. */ + private static final int MAX_REGISTER_ATTEMPTS = 3; + + private final ProducerSnapshotStore snapshotStore; + private final long defaultTtlMs; + private final long cleanupIntervalMs; + private final ScheduledExecutorService cleanupScheduler; + + public ProducerSnapshotManager(Configuration conf, ZooKeeperClient zkClient) { + this( + new ProducerSnapshotStore(zkClient, conf.getString(ConfigOptions.REMOTE_DATA_DIR)), + conf.get(ConfigOptions.COORDINATOR_PRODUCER_SNAPSHOT_TTL).toMillis(), + conf.get(ConfigOptions.COORDINATOR_PRODUCER_SNAPSHOT_CLEANUP_INTERVAL).toMillis()); + } + + @VisibleForTesting + ProducerSnapshotManager( + ProducerSnapshotStore snapshotStore, long defaultTtlMs, long cleanupIntervalMs) { + this.snapshotStore = snapshotStore; + this.defaultTtlMs = defaultTtlMs; + this.cleanupIntervalMs = cleanupIntervalMs; + this.cleanupScheduler = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("producer-snapshot-cleanup")); + } + + /** Starts the background cleanup task. */ + public void start() { + cleanupScheduler.scheduleAtFixedRate( + this::runCleanup, cleanupIntervalMs, cleanupIntervalMs, TimeUnit.MILLISECONDS); + LOG.info( + "Started producer snapshot manager with TTL={} ms, cleanup interval={} ms", + defaultTtlMs, + cleanupIntervalMs); + } + + // ------------------------------------------------------------------------ + // Public API + // ------------------------------------------------------------------------ + + /** + * Registers a new producer offset snapshot with atomic "check and register" semantics. + * + *

This method uses ZooKeeper's version mechanism to handle concurrent requests safely: + * + *

    + *
  • Atomicity: Uses ZK's atomic create for new snapshots + *
  • Idempotency: Concurrent requests with same producerId will have exactly one succeed + * with CREATED, others return ALREADY_EXISTS + *
  • Version-based cleanup: Uses ZK version to safely delete expired snapshots without race + * conditions + *
+ * + * @param producerId the producer ID (typically Flink job ID) + * @param offsets map of TableBucket to offset for all tables + * @param ttlMs TTL in milliseconds for the snapshot, or null to use default + * @return true if a new snapshot was created (CREATED), false if snapshot already existed + * (ALREADY_EXISTS) + * @throws Exception if the operation fails + */ + public boolean registerSnapshot(String producerId, Map offsets, Long ttlMs) + throws Exception { + long effectiveTtlMs = ttlMs != null ? ttlMs : defaultTtlMs; + + // Use loop instead of recursion to avoid stack overflow risk + for (int attempt = 0; attempt < MAX_REGISTER_ATTEMPTS; attempt++) { + long currentTimeMs = System.currentTimeMillis(); + long expirationTime = currentTimeMs + effectiveTtlMs; + + // Step 1: Try to atomically create the snapshot (common case) + if (snapshotStore.tryStoreSnapshot(producerId, offsets, expirationTime)) { + return true; + } + + // Step 2: Snapshot exists - check if it's valid or expired + RegisterAttemptResult result = + handleExistingSnapshot(producerId, offsets, expirationTime, currentTimeMs); + + switch (result) { + case ALREADY_EXISTS: + return false; + case CREATED: + return true; + case RETRY: + // Continue to next iteration + LOG.debug( + "Retrying snapshot registration for producer {} (attempt {}/{})", + producerId, + attempt + 1, + MAX_REGISTER_ATTEMPTS); + break; + } + } + + // Exhausted all attempts + LOG.warn( + "Failed to register snapshot for producer {} after {} attempts, " + + "concurrent modifications may have interfered", + producerId, + MAX_REGISTER_ATTEMPTS); + return false; + } + + /** + * Handles the case where a snapshot already exists for the producer. + * + * @return the result indicating next action: ALREADY_EXISTS, CREATED, or RETRY + */ + private RegisterAttemptResult handleExistingSnapshot( + String producerId, + Map offsets, + long expirationTime, + long currentTimeMs) + throws Exception { + + Optional> existingWithVersion = + snapshotStore.getSnapshotMetadataWithVersion(producerId); + + // Case 1: Snapshot was deleted between our create attempt and this check + if (!existingWithVersion.isPresent()) { + return snapshotStore.tryStoreSnapshot(producerId, offsets, expirationTime) + ? RegisterAttemptResult.CREATED + : RegisterAttemptResult.RETRY; + } + + ProducerSnapshot existingSnapshot = existingWithVersion.get().f0; + int version = existingWithVersion.get().f1; + + // Case 2: Valid (non-expired) snapshot exists + if (!existingSnapshot.isExpired(currentTimeMs)) { + LOG.info( + "Producer snapshot already exists for producer {} (expires at {}, version {})", + producerId, + existingSnapshot.getExpirationTime(), + version); + return RegisterAttemptResult.ALREADY_EXISTS; + } + + // Case 3: Expired snapshot - try to clean up and create new one + return tryReplaceExpiredSnapshot( + producerId, offsets, expirationTime, existingSnapshot, version, currentTimeMs); + } + + /** + * Attempts to replace an expired snapshot with a new one. + * + * @return the result indicating next action + */ + private RegisterAttemptResult tryReplaceExpiredSnapshot( + String producerId, + Map offsets, + long expirationTime, + ProducerSnapshot expiredSnapshot, + int version, + long currentTimeMs) + throws Exception { + + LOG.info( + "Found expired snapshot for producer {} (version {}), attempting cleanup", + producerId, + version); + + // Try version-based conditional delete + boolean deleted = + snapshotStore.deleteSnapshotIfVersion(producerId, expiredSnapshot, version); + + if (!deleted) { + // Version mismatch - another process modified the snapshot + LOG.debug( + "Version mismatch during delete for producer {}, checking current state", + producerId); + return checkCurrentSnapshotState(producerId, currentTimeMs); + } + + // Successfully deleted, try to create new snapshot + if (snapshotStore.tryStoreSnapshot(producerId, offsets, expirationTime)) { + return RegisterAttemptResult.CREATED; + } + + // Another concurrent request created a snapshot after our delete + LOG.debug( + "Concurrent creation detected for producer {} after delete, checking state", + producerId); + return checkCurrentSnapshotState(producerId, currentTimeMs); + } + + /** + * Checks the current snapshot state and returns the appropriate result. + * + *

This method is used after detecting concurrent modifications to determine whether a valid + * snapshot now exists or if we should retry. + * + * @param producerId the producer ID + * @param currentTimeMs the current time for expiration check + * @return ALREADY_EXISTS if a valid snapshot exists, RETRY otherwise + */ + private RegisterAttemptResult checkCurrentSnapshotState(String producerId, long currentTimeMs) + throws Exception { + Optional snapshot = snapshotStore.getSnapshotMetadata(producerId); + + if (isValidSnapshot(snapshot, currentTimeMs)) { + LOG.info( + "Valid snapshot exists for producer {} after concurrent modification", + producerId); + return RegisterAttemptResult.ALREADY_EXISTS; + } + + LOG.debug("No valid snapshot for producer {} after concurrent modification", producerId); + return RegisterAttemptResult.RETRY; + } + + /** + * Checks if a snapshot is present and not expired. + * + * @param snapshot the optional snapshot + * @param currentTimeMs the current time for expiration check + * @return true if snapshot exists and is not expired + */ + private boolean isValidSnapshot(Optional snapshot, long currentTimeMs) { + return snapshot.isPresent() && !snapshot.get().isExpired(currentTimeMs); + } + + /** Result of a single registration attempt. */ + private enum RegisterAttemptResult { + /** Snapshot was successfully created. */ + CREATED, + /** A valid snapshot already exists. */ + ALREADY_EXISTS, + /** Should retry the registration. */ + RETRY + } + + /** + * Gets the snapshot metadata for a producer. + * + * @param producerId the producer ID + * @return Optional containing the snapshot if exists + * @throws Exception if the operation fails + */ + public Optional getSnapshotMetadata(String producerId) throws Exception { + return snapshotStore.getSnapshotMetadata(producerId); + } + + /** + * Reads all offsets for a producer. + * + * @param producerId the producer ID + * @return map of TableBucket to offset, or empty map if no snapshot exists + * @throws Exception if the operation fails + */ + public Map getOffsets(String producerId) throws Exception { + return snapshotStore.readOffsets(producerId); + } + + /** + * Deletes a producer snapshot. + * + * @param producerId the producer ID + * @throws Exception if the operation fails + */ + public void deleteSnapshot(String producerId) throws Exception { + snapshotStore.deleteSnapshot(producerId); + } + + /** + * Gets the default TTL in milliseconds. + * + * @return the default TTL + */ + public long getDefaultTtlMs() { + return defaultTtlMs; + } + + // ------------------------------------------------------------------------ + // Cleanup Operations + // ------------------------------------------------------------------------ + + /** Runs the cleanup task (expired snapshots and orphan files). */ + @VisibleForTesting + void runCleanup() { + try { + int expiredCount = cleanupExpiredSnapshots(); + if (expiredCount > 0) { + LOG.info("Producer snapshot cleanup: removed {} expired snapshots", expiredCount); + } + + int orphanCount = cleanupOrphanFiles(); + if (orphanCount > 0) { + LOG.info("Producer snapshot cleanup: removed {} orphan files", orphanCount); + } + } catch (Exception e) { + LOG.warn("Failed to cleanup producer snapshots", e); + } + } + + /** + * Cleans up expired producer snapshots. + * + * @return number of snapshots cleaned up + * @throws Exception if the operation fails + */ + @VisibleForTesting + int cleanupExpiredSnapshots() throws Exception { + List producerIds = snapshotStore.listProducerIds(); + int cleanedCount = 0; + long currentTime = System.currentTimeMillis(); + + for (String producerId : producerIds) { + try { + Optional optSnapshot = + snapshotStore.getSnapshotMetadata(producerId); + if (isExpiredSnapshot(optSnapshot, currentTime)) { + snapshotStore.deleteSnapshot(producerId); + cleanedCount++; + LOG.debug("Cleaned up expired snapshot for producer {}", producerId); + } + } catch (Exception e) { + LOG.warn("Failed to cleanup snapshot for producer {}", producerId, e); + } + } + + return cleanedCount; + } + + /** + * Checks if a snapshot is present and expired. + * + * @param snapshot the optional snapshot + * @param currentTimeMs the current time for expiration check + * @return true if snapshot exists and is expired + */ + private boolean isExpiredSnapshot(Optional snapshot, long currentTimeMs) { + return snapshot.isPresent() && snapshot.get().isExpired(currentTimeMs); + } + + /** + * Cleans up orphan files in remote storage that don't have corresponding ZK metadata. + * + *

Orphan files can occur when: + * + *

    + *
  • ZK metadata was deleted but remote file deletion failed + *
  • Process crashed after writing remote file but before creating ZK metadata + *
  • Network issues caused partial cleanup + *
+ * + * @return number of orphan files cleaned up + * @throws Exception if the operation fails + */ + @VisibleForTesting + int cleanupOrphanFiles() throws Exception { + FsPath producersDir = snapshotStore.getProducersDirectory(); + FileSystem fileSystem = producersDir.getFileSystem(); + + if (!fileSystem.exists(producersDir)) { + LOG.debug("Producers directory does not exist, no orphan files to clean"); + return 0; + } + + // Get all producer IDs from ZK (these are valid) + Set validProducerIds = new HashSet<>(snapshotStore.listProducerIds()); + + // Collect all valid file paths from ZK metadata + Set validFilePaths = collectValidFilePaths(validProducerIds); + + int cleanedCount = 0; + + // List all producer directories in remote storage + FileStatus[] producerDirs = fileSystem.listStatus(producersDir); + if (producerDirs == null) { + return 0; + } + + for (FileStatus producerDirStatus : producerDirs) { + if (!producerDirStatus.isDir()) { + continue; + } + + String producerId = producerDirStatus.getPath().getName(); + + // Case 1: Producer ID not in ZK - entire directory is orphan + if (!validProducerIds.contains(producerId)) { + LOG.info( + "Found orphan producer directory {} (no ZK metadata), cleaning up", + producerId); + cleanedCount += + snapshotStore.deleteDirectoryRecursively( + fileSystem, producerDirStatus.getPath()); + continue; + } + + // Case 2: Producer ID exists in ZK - check individual files + cleanedCount += + cleanupOrphanFilesForProducer( + fileSystem, producerDirStatus.getPath(), validFilePaths); + } + + return cleanedCount; + } + + private Set collectValidFilePaths(Set validProducerIds) { + Set validFilePaths = new HashSet<>(); + for (String producerId : validProducerIds) { + try { + Optional optSnapshot = + snapshotStore.getSnapshotMetadata(producerId); + if (optSnapshot.isPresent()) { + for (ProducerSnapshot.TableOffsetMetadata tableMetadata : + optSnapshot.get().getTableOffsets()) { + validFilePaths.add(tableMetadata.getOffsetsPath().toString()); + } + } + } catch (Exception e) { + LOG.warn( + "Failed to get snapshot for producer {} during orphan cleanup, " + + "skipping its files", + producerId, + e); + } + } + return validFilePaths; + } + + private int cleanupOrphanFilesForProducer( + FileSystem fileSystem, FsPath producerDir, Set validFilePaths) + throws IOException { + int cleanedCount = 0; + + // List table directories under producer + FileStatus[] tableDirs = fileSystem.listStatus(producerDir); + if (tableDirs == null) { + return 0; + } + + for (FileStatus tableDirStatus : tableDirs) { + if (!tableDirStatus.isDir()) { + continue; + } + + // List offset files under table directory + FileStatus[] offsetFiles = fileSystem.listStatus(tableDirStatus.getPath()); + if (offsetFiles == null) { + continue; + } + + boolean hasValidFiles = false; + for (FileStatus fileStatus : offsetFiles) { + if (fileStatus.isDir()) { + continue; + } + + String filePath = fileStatus.getPath().toString(); + if (!validFilePaths.contains(filePath)) { + // This file is not referenced by any ZK metadata - it's orphan + LOG.debug("Deleting orphan file: {}", filePath); + try { + fileSystem.delete(fileStatus.getPath(), false); + cleanedCount++; + } catch (IOException e) { + LOG.warn("Failed to delete orphan file: {}", filePath, e); + } + } else { + hasValidFiles = true; + } + } + + // If table directory is now empty, delete it + if (!hasValidFiles) { + tryDeleteEmptyDirectory(fileSystem, tableDirStatus.getPath()); + } + } + + // Check if producer directory is now empty + tryDeleteEmptyDirectory(fileSystem, producerDir); + + return cleanedCount; + } + + private void tryDeleteEmptyDirectory(FileSystem fileSystem, FsPath dir) { + try { + FileStatus[] remaining = fileSystem.listStatus(dir); + if (remaining == null || remaining.length == 0) { + fileSystem.delete(dir, false); + LOG.debug("Deleted empty directory: {}", dir); + } + } catch (IOException e) { + LOG.warn("Failed to delete empty directory: {}", dir, e); + } + } + + @Override + public void close() { + if (cleanupScheduler != null) { + cleanupScheduler.shutdown(); + try { + if (!cleanupScheduler.awaitTermination(5, TimeUnit.SECONDS)) { + cleanupScheduler.shutdownNow(); + } + } catch (InterruptedException e) { + cleanupScheduler.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + LOG.info("Producer snapshot manager closed"); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/producer/ProducerSnapshotStore.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/producer/ProducerSnapshotStore.java new file mode 100644 index 0000000000..d343307f8a --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/producer/ProducerSnapshotStore.java @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.producer; + +import org.apache.fluss.fs.FSDataInputStream; +import org.apache.fluss.fs.FSDataOutputStream; +import org.apache.fluss.fs.FileStatus; +import org.apache.fluss.fs.FileSystem; +import org.apache.fluss.fs.FsPath; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.producer.ProducerSnapshot; +import org.apache.fluss.utils.IOUtils; +import org.apache.fluss.utils.RetryUtils; +import org.apache.fluss.utils.json.TableBucketOffsets; +import org.apache.fluss.utils.types.Tuple2; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +/** + * Low-level storage operations for producer offset snapshots. + * + *

This class handles: + * + *

    + *
  • Writing offset data to remote storage (OSS/S3/HDFS) + *
  • Reading offset data from remote storage + *
  • Registering snapshot metadata in ZooKeeper + *
  • Deleting snapshots from both ZK and remote storage + *
+ * + *

This class is stateless and thread-safe. Lifecycle management should be handled by {@link + * ProducerSnapshotManager}. + */ +public class ProducerSnapshotStore { + + private static final Logger LOG = LoggerFactory.getLogger(ProducerSnapshotStore.class); + + private static final int MAX_RETRIES = 3; + private static final long INITIAL_BACKOFF_MS = 100; + private static final long MAX_BACKOFF_MS = 1000; + + private final ZooKeeperClient zkClient; + private final String remoteDataDir; + + public ProducerSnapshotStore(ZooKeeperClient zkClient, String remoteDataDir) { + this.zkClient = zkClient; + this.remoteDataDir = remoteDataDir; + } + + // ------------------------------------------------------------------------ + // Core CRUD Operations + // ------------------------------------------------------------------------ + + /** + * Atomically stores a new producer offset snapshot. + * + *

This method first writes offset files to remote storage, then attempts to atomically + * create the ZK metadata. If the ZK node already exists, this method returns false and cleans + * up the remote files. + * + *

Note on potential orphan files: There is a small window where orphan files can be + * created if the process crashes after writing remote files but before creating ZK metadata, or + * if ZK metadata creation fails. This is an acceptable trade-off because: + * + *

    + *
  • The alternative (ZK first, then files) would leave ZK metadata pointing to non-existent + * files, which is worse + *
  • Orphan files are harmless and will be cleaned up by {@link + * ProducerSnapshotManager#cleanupOrphanFiles()} + *
  • A unified orphan file cleanup mechanism will handle these cases in the future + *
+ * + * @param producerId the producer ID + * @param offsets map of TableBucket to offset + * @param expirationTime the expiration timestamp in milliseconds + * @return true if created, false if already existed + * @throws Exception if the operation fails + */ + public boolean tryStoreSnapshot( + String producerId, Map offsets, long expirationTime) + throws Exception { + + Map> offsetsByTable = groupOffsetsByTable(offsets); + List tableMetadatas = new ArrayList<>(); + List createdFiles = new ArrayList<>(); + + try { + // Write offset files to remote storage + for (Map.Entry> entry : offsetsByTable.entrySet()) { + FsPath path = writeOffsetsFile(producerId, entry.getKey(), entry.getValue()); + createdFiles.add(path); + tableMetadatas.add(new ProducerSnapshot.TableOffsetMetadata(entry.getKey(), path)); + } + + // Atomically create ZK metadata + ProducerSnapshot snapshot = new ProducerSnapshot(expirationTime, tableMetadatas); + boolean created = zkClient.tryRegisterProducerSnapshot(producerId, snapshot); + + if (!created) { + LOG.info( + "Snapshot already exists for producer {}, cleaning up {} files", + producerId, + createdFiles.size()); + cleanupFilesSafely(createdFiles); + return false; + } + + LOG.info( + "Stored snapshot for producer {} with {} tables, expires at {}", + producerId, + tableMetadatas.size(), + expirationTime); + return true; + + } catch (Exception e) { + LOG.warn( + "Failed to store snapshot for producer {}, cleaning up {} files", + producerId, + createdFiles.size(), + e); + cleanupFilesSafely(createdFiles); + throw e; + } + } + + /** Gets the snapshot metadata for a producer. */ + public Optional getSnapshotMetadata(String producerId) throws Exception { + return zkClient.getProducerSnapshot(producerId); + } + + /** + * Gets the snapshot metadata for a producer along with its ZK version. + * + *

The version can be used for conditional deletes to handle concurrent modifications safely. + * + * @param producerId the producer ID + * @return Optional containing a Tuple2 of (ProducerSnapshot, version) if exists + * @throws Exception if the operation fails + */ + public Optional> getSnapshotMetadataWithVersion( + String producerId) throws Exception { + return zkClient.getProducerSnapshotWithVersion(producerId); + } + + /** Reads all offsets for a producer from remote storage. */ + public Map readOffsets(String producerId) throws Exception { + Optional optSnapshot = zkClient.getProducerSnapshot(producerId); + if (!optSnapshot.isPresent()) { + return new HashMap<>(); + } + + Map allOffsets = new HashMap<>(); + for (ProducerSnapshot.TableOffsetMetadata metadata : optSnapshot.get().getTableOffsets()) { + allOffsets.putAll(readOffsetsFile(metadata.getOffsetsPath())); + } + return allOffsets; + } + + /** Deletes a producer snapshot (both ZK metadata and remote files). */ + public void deleteSnapshot(String producerId) throws Exception { + Optional optSnapshot = zkClient.getProducerSnapshot(producerId); + if (!optSnapshot.isPresent()) { + LOG.debug("No snapshot found for producer {}", producerId); + return; + } + + // Delete remote files + for (ProducerSnapshot.TableOffsetMetadata metadata : optSnapshot.get().getTableOffsets()) { + deleteRemoteFile(metadata.getOffsetsPath()); + } + + // Delete ZK metadata + zkClient.deleteProducerSnapshot(producerId); + LOG.info("Deleted snapshot for producer {}", producerId); + } + + /** + * Deletes a producer snapshot only if the ZK version matches. + * + *

This provides optimistic concurrency control - the delete will only succeed if no other + * process has modified the snapshot since it was read. + * + * @param producerId the producer ID + * @param snapshot the snapshot to delete (used to get file paths) + * @param expectedVersion the expected ZK version + * @return true if deleted successfully, false if version mismatch + * @throws Exception if the operation fails for reasons other than version mismatch + */ + public boolean deleteSnapshotIfVersion( + String producerId, ProducerSnapshot snapshot, int expectedVersion) throws Exception { + // First try to delete ZK metadata with version check + boolean deleted = zkClient.deleteProducerSnapshotIfVersion(producerId, expectedVersion); + if (!deleted) { + LOG.debug( + "Failed to delete snapshot for producer {} - version mismatch, " + + "snapshot was modified by another process", + producerId); + return false; + } + + // ZK metadata deleted successfully, now clean up remote files + // Even if file deletion fails, the snapshot is already gone from ZK + // Orphan files will be cleaned up by the periodic cleanup task + for (ProducerSnapshot.TableOffsetMetadata metadata : snapshot.getTableOffsets()) { + deleteRemoteFile(metadata.getOffsetsPath()); + } + + LOG.info("Deleted snapshot for producer {} with version {}", producerId, expectedVersion); + return true; + } + + /** Lists all producer IDs with registered snapshots. */ + public List listProducerIds() throws Exception { + return zkClient.listProducerIds(); + } + + // ------------------------------------------------------------------------ + // Remote Storage Operations + // ------------------------------------------------------------------------ + + /** Gets the base directory for producer snapshots in remote storage. */ + public FsPath getProducersDirectory() { + return new FsPath(remoteDataDir + "/producers"); + } + + /** + * Deletes a remote file, logging but not throwing on failure. + * + * @param filePath the file path to delete + * @return true if deleted successfully or file didn't exist, false if deletion failed + */ + public boolean deleteRemoteFile(FsPath filePath) { + try { + FileSystem fs = filePath.getFileSystem(); + if (fs.exists(filePath)) { + fs.delete(filePath, false); + LOG.debug("Deleted remote file: {}", filePath); + } + return true; + } catch (IOException e) { + LOG.warn("Failed to delete remote file: {}", filePath, e); + return false; + } + } + + /** Recursively deletes a directory, returning the count of deleted files. */ + public int deleteDirectoryRecursively(FileSystem fs, FsPath path) { + int count = 0; + try { + FileStatus[] contents = fs.listStatus(path); + if (contents != null) { + for (FileStatus status : contents) { + if (status.isDir()) { + count += deleteDirectoryRecursively(fs, status.getPath()); + } else if (fs.delete(status.getPath(), false)) { + count++; + } + } + } + fs.delete(path, false); + } catch (IOException e) { + LOG.warn("Failed to delete directory: {}", path, e); + } + return count; + } + + // ------------------------------------------------------------------------ + // Private Helpers + // ------------------------------------------------------------------------ + + private Map> groupOffsetsByTable(Map offsets) { + Map> result = new HashMap<>(); + for (Map.Entry entry : offsets.entrySet()) { + result.computeIfAbsent(entry.getKey().getTableId(), k -> new HashMap<>()) + .put(entry.getKey(), entry.getValue()); + } + return result; + } + + private FsPath writeOffsetsFile(String producerId, long tableId, Map offsets) + throws IOException { + + String fileName = UUID.randomUUID() + ".offsets"; + FsPath path = + new FsPath( + new FsPath( + new FsPath(new FsPath(remoteDataDir, "producers"), producerId), + String.valueOf(tableId)), + fileName); + + byte[] data = new TableBucketOffsets(tableId, offsets).toJsonBytes(); + + return RetryUtils.executeIOWithRetry( + () -> { + FileSystem fs = path.getFileSystem(); + FsPath parentDir = path.getParent(); + if (parentDir != null && !fs.exists(parentDir)) { + fs.mkdirs(parentDir); + } + try (FSDataOutputStream out = fs.create(path, FileSystem.WriteMode.OVERWRITE)) { + out.write(data); + } + LOG.debug( + "Wrote offsets for producer {} table {} at {}", + producerId, + tableId, + path); + return path; + }, + String.format("write offsets file for producer %s table %d", producerId, tableId), + MAX_RETRIES, + INITIAL_BACKOFF_MS, + MAX_BACKOFF_MS); + } + + private Map readOffsetsFile(FsPath path) throws IOException { + return RetryUtils.executeIOWithRetry( + () -> { + FileSystem fs = path.getFileSystem(); + if (!fs.exists(path)) { + throw new IOException("Offsets file not found: " + path); + } + try (FSDataInputStream in = fs.open(path); + ByteArrayOutputStream out = new ByteArrayOutputStream()) { + IOUtils.copyBytes(in, out, true); + return TableBucketOffsets.fromJsonBytes(out.toByteArray()).getOffsets(); + } + }, + "read offsets file " + path, + MAX_RETRIES, + INITIAL_BACKOFF_MS, + MAX_BACKOFF_MS); + } + + /** + * Safely cleans up a list of files, ensuring all files are attempted even if some fail. + * + *

This method reuses {@link #deleteRemoteFile(FsPath)} for each file and logs a summary if + * any deletions failed. + * + * @param files the list of files to clean up + */ + private void cleanupFilesSafely(List files) { + int failedCount = 0; + for (FsPath file : files) { + if (!deleteRemoteFile(file)) { + failedCount++; + } + } + if (failedCount > 0) { + LOG.warn( + "Failed to cleanup {} out of {} files, orphan files will be cleaned by periodic task", + failedCount, + files.size()); + } + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java index f174f727a4..72dfa91e60 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java @@ -68,6 +68,7 @@ import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataResponse; import org.apache.fluss.rpc.messages.GetLatestKvSnapshotsResponse; import org.apache.fluss.rpc.messages.GetLatestLakeSnapshotResponse; +import org.apache.fluss.rpc.messages.GetProducerOffsetsResponse; import org.apache.fluss.rpc.messages.InitWriterResponse; import org.apache.fluss.rpc.messages.LakeTieringHeartbeatResponse; import org.apache.fluss.rpc.messages.LimitScanResponse; @@ -122,6 +123,7 @@ import org.apache.fluss.rpc.messages.PbPrefixLookupRespForBucket; import org.apache.fluss.rpc.messages.PbProduceLogReqForBucket; import org.apache.fluss.rpc.messages.PbProduceLogRespForBucket; +import org.apache.fluss.rpc.messages.PbProducerTableOffsets; import org.apache.fluss.rpc.messages.PbPutKvReqForBucket; import org.apache.fluss.rpc.messages.PbPutKvRespForBucket; import org.apache.fluss.rpc.messages.PbRebalancePlanForBucket; @@ -1624,6 +1626,71 @@ public static TableBucketOffsets toTableBucketOffsets(PbTableOffsets pbTableOffs return new TableBucketOffsets(tableId, bucketOffsets); } + /** + * Populates a PbTableOffsets with bucket offsets. + * + * @param pbTableOffsets the protobuf table offsets to populate + * @param bucketOffsets map of TableBucket to offset + */ + public static void populatePbTableOffsets( + PbTableOffsets pbTableOffsets, Map bucketOffsets) { + for (Map.Entry entry : bucketOffsets.entrySet()) { + TableBucket bucket = entry.getKey(); + PbBucketOffset pbBucketOffset = + pbTableOffsets + .addBucketOffset() + .setBucketId(bucket.getBucket()) + .setLogEndOffset(entry.getValue()); + if (bucket.getPartitionId() != null) { + pbBucketOffset.setPartitionId(bucket.getPartitionId()); + } + } + } + + /** + * Groups offsets by table ID. + * + * @param allOffsets map of TableBucket to offset + * @return map of tableId to (map of TableBucket to offset) + */ + public static Map> groupOffsetsByTableId( + Map allOffsets) { + Map> offsetsByTable = new HashMap<>(); + for (Map.Entry entry : allOffsets.entrySet()) { + TableBucket bucket = entry.getKey(); + offsetsByTable + .computeIfAbsent(bucket.getTableId(), k -> new HashMap<>()) + .put(bucket, entry.getValue()); + } + return offsetsByTable; + } + + /** + * Populates a GetProducerOffsetsResponse with table offsets. + * + * @param response the response to populate + * @param tableId the table ID + * @param bucketOffsets the bucket offsets for this table + */ + public static void addTableOffsetsToResponse( + GetProducerOffsetsResponse response, + long tableId, + Map bucketOffsets) { + PbProducerTableOffsets pbTableOffsets = response.addTableOffset(); + pbTableOffsets.setTableId(tableId); + for (Map.Entry entry : bucketOffsets.entrySet()) { + TableBucket bucket = entry.getKey(); + PbBucketOffset pbBucketOffset = + pbTableOffsets + .addBucketOffset() + .setBucketId(bucket.getBucket()) + .setLogEndOffset(entry.getValue()); + if (bucket.getPartitionId() != null) { + pbBucketOffset.setPartitionId(bucket.getPartitionId()); + } + } + } + public static PbNotifyLakeTableOffsetReqForBucket makeNotifyLakeTableOffsetForBucket( TableBucket tableBucket, LakeTableSnapshot lakeTableSnapshot, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java index bd6bcad7f3..1fc95e8a15 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java @@ -66,6 +66,8 @@ import org.apache.fluss.server.zk.data.ZkData.PartitionSequenceIdZNode; import org.apache.fluss.server.zk.data.ZkData.PartitionZNode; import org.apache.fluss.server.zk.data.ZkData.PartitionsZNode; +import org.apache.fluss.server.zk.data.ZkData.ProducerIdZNode; +import org.apache.fluss.server.zk.data.ZkData.ProducersZNode; import org.apache.fluss.server.zk.data.ZkData.RebalanceZNode; import org.apache.fluss.server.zk.data.ZkData.ResourceAclNode; import org.apache.fluss.server.zk.data.ZkData.SchemaZNode; @@ -80,6 +82,7 @@ import org.apache.fluss.server.zk.data.ZkData.WriterIdZNode; import org.apache.fluss.server.zk.data.lake.LakeTable; import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot; +import org.apache.fluss.server.zk.data.producer.ProducerSnapshot; import org.apache.fluss.shaded.curator5.org.apache.curator.framework.CuratorFramework; import org.apache.fluss.shaded.curator5.org.apache.curator.framework.api.BackgroundCallback; import org.apache.fluss.shaded.curator5.org.apache.curator.framework.api.CuratorEvent; @@ -1590,4 +1593,134 @@ public static Map> processGetChildrenResponses( } return result; } + + // -------------------------------------------------------------------------------------------- + // Producer Offset Snapshot + // -------------------------------------------------------------------------------------------- + + /** + * Tries to atomically register a producer offset snapshot to ZK. + * + *

This method leverages ZooKeeper's atomic create operation to ensure that only one + * concurrent request can successfully create the snapshot. If a snapshot already exists for the + * given producer ID, this method returns false instead of throwing an exception. + * + * @param producerId the producer ID (typically Flink job ID) + * @param snapshot the producer snapshot containing expiration time and table offset metadata + * @return true if the snapshot was created successfully, false if a snapshot already exists + * @throws Exception if the operation fails for reasons other than node already existing + */ + public boolean tryRegisterProducerSnapshot(String producerId, ProducerSnapshot snapshot) + throws Exception { + String path = ProducerIdZNode.path(producerId); + try { + zkClient.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(path, ProducerIdZNode.encode(snapshot)); + LOG.info("Registered producer snapshot for producer {} at path {}.", producerId, path); + return true; + } catch (KeeperException.NodeExistsException e) { + LOG.debug( + "Producer snapshot already exists for producer {} at path {}, " + + "returning false.", + producerId, + path); + return false; + } + } + + /** + * Gets the {@link ProducerSnapshot} for the given producer ID. + * + * @param producerId the producer ID + * @return an Optional containing the ProducerSnapshot if it exists, empty otherwise + * @throws Exception if the operation fails + */ + public Optional getProducerSnapshot(String producerId) throws Exception { + String zkPath = ProducerIdZNode.path(producerId); + return getOrEmpty(zkPath).map(ProducerIdZNode::decode); + } + + /** + * Deletes the producer offset snapshot for the given producer ID. + * + * @param producerId the producer ID + * @throws Exception if the operation fails + */ + public void deleteProducerSnapshot(String producerId) throws Exception { + String path = ProducerIdZNode.path(producerId); + zkClient.delete().forPath(path); + LOG.info("Deleted producer snapshot for producer {} at path {}.", producerId, path); + } + + /** + * Gets the {@link ProducerSnapshot} for the given producer ID along with its ZK version. + * + *

The version can be used for conditional updates/deletes to handle concurrent modifications + * safely. + * + * @param producerId the producer ID + * @return an Optional containing a Tuple2 of (ProducerSnapshot, version) if it exists, empty + * otherwise + * @throws Exception if the operation fails + */ + public Optional> getProducerSnapshotWithVersion( + String producerId) throws Exception { + String zkPath = ProducerIdZNode.path(producerId); + try { + Stat stat = new Stat(); + byte[] data = zkClient.getData().storingStatIn(stat).forPath(zkPath); + return Optional.of(Tuple2.of(ProducerIdZNode.decode(data), stat.getVersion())); + } catch (KeeperException.NoNodeException e) { + return Optional.empty(); + } + } + + /** + * Deletes the producer offset snapshot for the given producer ID only if the version matches. + * + *

This provides optimistic concurrency control - the delete will only succeed if no other + * process has modified the snapshot since it was read. + * + * @param producerId the producer ID + * @param expectedVersion the expected ZK version (obtained from getProducerSnapshotWithVersion) + * @return true if deleted successfully, false if version mismatch (snapshot was modified) + * @throws Exception if the operation fails for reasons other than version mismatch + */ + public boolean deleteProducerSnapshotIfVersion(String producerId, int expectedVersion) + throws Exception { + String path = ProducerIdZNode.path(producerId); + try { + zkClient.delete().withVersion(expectedVersion).forPath(path); + LOG.info( + "Deleted producer snapshot for producer {} at path {} with version {}.", + producerId, + path, + expectedVersion); + return true; + } catch (KeeperException.BadVersionException e) { + LOG.debug( + "Failed to delete producer snapshot for producer {} - version mismatch " + + "(expected {}, snapshot was modified by another process).", + producerId, + expectedVersion); + return false; + } catch (KeeperException.NoNodeException e) { + LOG.debug( + "Producer snapshot for producer {} was already deleted by another process.", + producerId); + return true; // Already deleted, consider it success + } + } + + /** + * Lists all producer IDs that have registered snapshots. + * + * @return list of producer IDs + * @throws Exception if the operation fails + */ + public List listProducerIds() throws Exception { + return getChildren(ProducersZNode.path()); + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java index 4672455672..6109545774 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java @@ -26,6 +26,8 @@ import org.apache.fluss.security.acl.ResourceType; import org.apache.fluss.server.zk.data.lake.LakeTable; import org.apache.fluss.server.zk.data.lake.LakeTableJsonSerde; +import org.apache.fluss.server.zk.data.producer.ProducerSnapshot; +import org.apache.fluss.server.zk.data.producer.ProducerSnapshotJsonSerde; import org.apache.fluss.utils.json.JsonSerdeUtils; import org.apache.fluss.utils.types.Tuple2; @@ -840,4 +842,67 @@ public static RebalanceTask decode(byte[] json) { return JsonSerdeUtils.readValue(json, RebalanceTaskJsonSerde.INSTANCE); } } + + // ------------------------------------------------------------------------------------------ + // ZNodes under "/producers/" + // ------------------------------------------------------------------------------------------ + + /** + * The znode for producers. This is the root node for all producer offset snapshots. The znode + * path is: + * + *

/producers + */ + public static final class ProducersZNode { + public static String path() { + return "/producers"; + } + } + + /** + * The znode for a specific producer's offset snapshot. The znode path is: + * + *

/producers/[producerId] + * + *

This znode stores {@link ProducerSnapshot} which contains: + * + *

    + *
  • expiration_time: TTL for automatic cleanup + *
  • tables: List of table offset metadata with paths to remote offset files + *
+ * + *

The actual offset data is stored in remote storage (e.g., OSS/S3) and referenced by the + * file paths in the metadata. + */ + public static final class ProducerIdZNode { + /** + * Returns the ZK path for the producer snapshot znode. + * + * @param producerId the producer ID (typically Flink job ID) + * @return the ZK path + */ + public static String path(String producerId) { + return ProducersZNode.path() + "/" + producerId; + } + + /** + * Encodes a ProducerSnapshot to JSON bytes for storage in ZK. + * + * @param snapshot the ProducerSnapshot to encode + * @return the encoded bytes + */ + public static byte[] encode(ProducerSnapshot snapshot) { + return JsonSerdeUtils.writeValueAsBytes(snapshot, ProducerSnapshotJsonSerde.INSTANCE); + } + + /** + * Decodes JSON bytes from ZK to a ProducerSnapshot. + * + * @param json the JSON bytes from ZK + * @return the decoded ProducerSnapshot + */ + public static ProducerSnapshot decode(byte[] json) { + return JsonSerdeUtils.readValue(json, ProducerSnapshotJsonSerde.INSTANCE); + } + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/producer/ProducerSnapshot.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/producer/ProducerSnapshot.java new file mode 100644 index 0000000000..556f99db9a --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/producer/ProducerSnapshot.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data.producer; + +import org.apache.fluss.fs.FsPath; +import org.apache.fluss.server.zk.data.ZkData; + +import java.util.List; +import java.util.Objects; + +/** + * Represents producer offset snapshot metadata stored in {@link ZkData.ProducerIdZNode}. + * + *

This class stores metadata about producer offset snapshots, including: + * + *

    + *
  • Expiration time for TTL management + *
  • List of table offset metadata, each containing table ID and path to offset file + *
+ * + *

The actual offset data is stored in remote storage (e.g., OSS/S3) and referenced by the file + * paths in {@link TableOffsetMetadata}. + * + * @see ProducerSnapshotJsonSerde for JSON serialization and deserialization + */ +public class ProducerSnapshot { + + /** The expiration time in milliseconds since epoch for TTL management. */ + private final long expirationTime; + + /** List of table offset metadata, each containing table ID and offset file path. */ + private final List tableOffsets; + + /** + * Creates a new ProducerSnapshot. + * + * @param expirationTime the expiration time in milliseconds since epoch + * @param tableOffsets list of table offset metadata + */ + public ProducerSnapshot(long expirationTime, List tableOffsets) { + this.expirationTime = expirationTime; + this.tableOffsets = tableOffsets; + } + + /** + * Returns the expiration time in milliseconds since epoch. + * + * @return the expiration time + */ + public long getExpirationTime() { + return expirationTime; + } + + /** + * Returns the list of table offset metadata. + * + * @return list of table offset metadata + */ + public List getTableOffsets() { + return tableOffsets; + } + + /** + * Checks if this snapshot has expired. + * + * @param currentTimeMs the current time in milliseconds since epoch + * @return true if the snapshot has expired, false otherwise + */ + public boolean isExpired(long currentTimeMs) { + return currentTimeMs > expirationTime; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ProducerSnapshot)) { + return false; + } + ProducerSnapshot that = (ProducerSnapshot) o; + return expirationTime == that.expirationTime + && Objects.equals(tableOffsets, that.tableOffsets); + } + + @Override + public int hashCode() { + return Objects.hash(expirationTime, tableOffsets); + } + + @Override + public String toString() { + return "ProducerSnapshot{" + + "expirationTime=" + + expirationTime + + ", tableOffsets=" + + tableOffsets + + '}'; + } + + /** Metadata for a single table's offset snapshot. */ + public static class TableOffsetMetadata { + + /** The table ID. */ + private final long tableId; + + /** The path to the offset file in remote storage. */ + private final FsPath offsetsPath; + + /** + * Creates a new TableOffsetMetadata. + * + * @param tableId the table ID + * @param offsetsPath the path to the offset file + */ + public TableOffsetMetadata(long tableId, FsPath offsetsPath) { + this.tableId = tableId; + this.offsetsPath = offsetsPath; + } + + /** + * Returns the table ID. + * + * @return the table ID + */ + public long getTableId() { + return tableId; + } + + /** + * Returns the path to the offset file. + * + * @return the offset file path + */ + public FsPath getOffsetsPath() { + return offsetsPath; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof TableOffsetMetadata)) { + return false; + } + TableOffsetMetadata that = (TableOffsetMetadata) o; + return tableId == that.tableId && Objects.equals(offsetsPath, that.offsetsPath); + } + + @Override + public int hashCode() { + return Objects.hash(tableId, offsetsPath); + } + + @Override + public String toString() { + return "TableOffsetMetadata{" + + "tableId=" + + tableId + + ", offsetsPath=" + + offsetsPath + + '}'; + } + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/producer/ProducerSnapshotJsonSerde.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/producer/ProducerSnapshotJsonSerde.java new file mode 100644 index 0000000000..98bec7c5f3 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/producer/ProducerSnapshotJsonSerde.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data.producer; + +import org.apache.fluss.fs.FsPath; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.fluss.utils.json.JsonDeserializer; +import org.apache.fluss.utils.json.JsonSerializer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * JSON serializer and deserializer for {@link ProducerSnapshot}. + * + *

The serialized format is: + * + *

{@code
+ * {
+ *   "version": 1,
+ *   "expiration_time": 1735538268000,
+ *   "tables": [
+ *     {
+ *       "table_id": 100,
+ *       "offsets_path": "oss://bucket/path/uuid.offsets"
+ *     }
+ *   ]
+ * }
+ * }
+ */ +public class ProducerSnapshotJsonSerde + implements JsonSerializer, JsonDeserializer { + + public static final ProducerSnapshotJsonSerde INSTANCE = new ProducerSnapshotJsonSerde(); + + private static final String VERSION_KEY = "version"; + private static final String EXPIRATION_TIME_KEY = "expiration_time"; + private static final String TABLES_KEY = "tables"; + private static final String TABLE_ID_KEY = "table_id"; + private static final String OFFSETS_PATH_KEY = "offsets_path"; + + private static final int CURRENT_VERSION = 1; + + @Override + public void serialize(ProducerSnapshot snapshot, JsonGenerator generator) throws IOException { + generator.writeStartObject(); + + generator.writeNumberField(VERSION_KEY, CURRENT_VERSION); + generator.writeNumberField(EXPIRATION_TIME_KEY, snapshot.getExpirationTime()); + + generator.writeArrayFieldStart(TABLES_KEY); + for (ProducerSnapshot.TableOffsetMetadata tableOffset : snapshot.getTableOffsets()) { + generator.writeStartObject(); + generator.writeNumberField(TABLE_ID_KEY, tableOffset.getTableId()); + generator.writeStringField(OFFSETS_PATH_KEY, tableOffset.getOffsetsPath().toString()); + generator.writeEndObject(); + } + generator.writeEndArray(); + + generator.writeEndObject(); + } + + @Override + public ProducerSnapshot deserialize(JsonNode node) { + int version = node.get(VERSION_KEY).asInt(); + if (version != CURRENT_VERSION) { + throw new IllegalArgumentException( + "Unsupported ProducerSnapshot version: " + + version + + ", expected: " + + CURRENT_VERSION); + } + + long expirationTime = node.get(EXPIRATION_TIME_KEY).asLong(); + + List tableOffsets = new ArrayList<>(); + JsonNode tablesNode = node.get(TABLES_KEY); + if (tablesNode != null && tablesNode.isArray()) { + Iterator elements = tablesNode.elements(); + while (elements.hasNext()) { + JsonNode tableNode = elements.next(); + long tableId = tableNode.get(TABLE_ID_KEY).asLong(); + String offsetsPath = tableNode.get(OFFSETS_PATH_KEY).asText(); + tableOffsets.add( + new ProducerSnapshot.TableOffsetMetadata(tableId, new FsPath(offsetsPath))); + } + } + + return new ProducerSnapshot(expirationTime, tableOffsets); + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java index 06d15d95ed..a0a49e027b 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java @@ -52,6 +52,8 @@ import org.apache.fluss.rpc.messages.CreateTableResponse; import org.apache.fluss.rpc.messages.DatabaseExistsRequest; import org.apache.fluss.rpc.messages.DatabaseExistsResponse; +import org.apache.fluss.rpc.messages.DeleteProducerOffsetsRequest; +import org.apache.fluss.rpc.messages.DeleteProducerOffsetsResponse; import org.apache.fluss.rpc.messages.DescribeClusterConfigsRequest; import org.apache.fluss.rpc.messages.DescribeClusterConfigsResponse; import org.apache.fluss.rpc.messages.DropAclsRequest; @@ -72,6 +74,8 @@ import org.apache.fluss.rpc.messages.GetLatestKvSnapshotsResponse; import org.apache.fluss.rpc.messages.GetLatestLakeSnapshotRequest; import org.apache.fluss.rpc.messages.GetLatestLakeSnapshotResponse; +import org.apache.fluss.rpc.messages.GetProducerOffsetsRequest; +import org.apache.fluss.rpc.messages.GetProducerOffsetsResponse; import org.apache.fluss.rpc.messages.GetTableInfoRequest; import org.apache.fluss.rpc.messages.GetTableInfoResponse; import org.apache.fluss.rpc.messages.GetTableSchemaRequest; @@ -94,6 +98,8 @@ import org.apache.fluss.rpc.messages.PrepareLakeTableSnapshotResponse; import org.apache.fluss.rpc.messages.RebalanceRequest; import org.apache.fluss.rpc.messages.RebalanceResponse; +import org.apache.fluss.rpc.messages.RegisterProducerOffsetsRequest; +import org.apache.fluss.rpc.messages.RegisterProducerOffsetsResponse; import org.apache.fluss.rpc.messages.RemoveServerTagRequest; import org.apache.fluss.rpc.messages.RemoveServerTagResponse; import org.apache.fluss.rpc.messages.TableExistsRequest; @@ -418,6 +424,24 @@ public CompletableFuture describeClusterConfigs( throw new UnsupportedOperationException(); } + @Override + public CompletableFuture registerProducerOffsets( + RegisterProducerOffsetsRequest request) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture getProducerOffsets( + GetProducerOffsetsRequest request) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture deleteProducerOffsets( + DeleteProducerOffsetsRequest request) { + throw new UnsupportedOperationException(); + } + public void setCurrentLeaderEpoch(TableBucket tableBucket, int leaderEpoch) { currentLeaderEpoch.put(tableBucket, leaderEpoch); } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/producer/ProducerSnapshotManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/producer/ProducerSnapshotManagerTest.java new file mode 100644 index 0000000000..42981d7f4d --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/producer/ProducerSnapshotManagerTest.java @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.producer; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.server.testutils.FlussClusterExtension; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.producer.ProducerSnapshot; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit tests for {@link ProducerSnapshotManager}. */ +class ProducerSnapshotManagerTest { + + @RegisterExtension + public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION = + FlussClusterExtension.builder() + .setNumOfTabletServers(1) + .setClusterConf(new Configuration()) + .build(); + + @TempDir Path tempDir; + + private ProducerSnapshotManager manager; + private ProducerSnapshotStore store; + private ZooKeeperClient zkClient; + + private static final long DEFAULT_TTL_MS = 3600000; // 1 hour + private static final long CLEANUP_INTERVAL_MS = 60000; // 1 minute + + @BeforeEach + void setUp() { + zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(); + store = new ProducerSnapshotStore(zkClient, tempDir.toString()); + manager = new ProducerSnapshotManager(store, DEFAULT_TTL_MS, CLEANUP_INTERVAL_MS); + } + + @AfterEach + void tearDown() { + if (manager != null) { + manager.close(); + } + } + + // ------------------------------------------------------------------------ + // Basic Registration Tests + // ------------------------------------------------------------------------ + + @Test + void testRegisterSnapshotSuccess() throws Exception { + String producerId = "test-manager-register"; + Map offsets = createTestOffsets(); + + boolean created = manager.registerSnapshot(producerId, offsets, null); + + assertThat(created).isTrue(); + Optional snapshot = manager.getSnapshotMetadata(producerId); + assertThat(snapshot).isPresent(); + assertThat(snapshot.get().getExpirationTime()).isGreaterThan(System.currentTimeMillis()); + } + + @Test + void testRegisterSnapshotAlreadyExists() throws Exception { + String producerId = "test-manager-exists"; + Map offsets1 = createTestOffsets(); + Map offsets2 = new HashMap<>(); + offsets2.put(new TableBucket(99L, 0), 999L); + + // First registration should succeed + assertThat(manager.registerSnapshot(producerId, offsets1, null)).isTrue(); + + // Second registration should return false (already exists) + assertThat(manager.registerSnapshot(producerId, offsets2, null)).isFalse(); + + // Original offsets should be preserved + Map retrieved = manager.getOffsets(producerId); + assertThat(retrieved).containsKey(new TableBucket(1L, 0)); + assertThat(retrieved).doesNotContainKey(new TableBucket(99L, 0)); + } + + @Test + void testRegisterSnapshotWithCustomTtl() throws Exception { + String producerId = "test-manager-custom-ttl"; + Map offsets = createTestOffsets(); + long customTtlMs = 1000; // 1 second + + long beforeRegister = System.currentTimeMillis(); + manager.registerSnapshot(producerId, offsets, customTtlMs); + long afterRegister = System.currentTimeMillis(); + + Optional snapshot = manager.getSnapshotMetadata(producerId); + assertThat(snapshot).isPresent(); + long expirationTime = snapshot.get().getExpirationTime(); + assertThat(expirationTime).isGreaterThanOrEqualTo(beforeRegister + customTtlMs); + assertThat(expirationTime).isLessThanOrEqualTo(afterRegister + customTtlMs); + } + + // ------------------------------------------------------------------------ + // Expired Snapshot Replacement Tests + // ------------------------------------------------------------------------ + + @Test + void testRegisterSnapshotReplacesExpired() throws Exception { + String producerId = "test-manager-replace-expired"; + Map offsets1 = new HashMap<>(); + offsets1.put(new TableBucket(1L, 0), 100L); + + Map offsets2 = new HashMap<>(); + offsets2.put(new TableBucket(2L, 0), 200L); + + // Register with very short TTL (already expired) + long expiredTtlMs = -1000; // Already expired + store.tryStoreSnapshot(producerId, offsets1, System.currentTimeMillis() + expiredTtlMs); + + // Verify it's expired + Optional expiredSnapshot = manager.getSnapshotMetadata(producerId); + assertThat(expiredSnapshot).isPresent(); + assertThat(expiredSnapshot.get().isExpired(System.currentTimeMillis())).isTrue(); + + // New registration should replace the expired one + boolean created = manager.registerSnapshot(producerId, offsets2, null); + assertThat(created).isTrue(); + + // Verify new offsets + Map retrieved = manager.getOffsets(producerId); + assertThat(retrieved).containsKey(new TableBucket(2L, 0)); + assertThat(retrieved).doesNotContainKey(new TableBucket(1L, 0)); + } + + // ------------------------------------------------------------------------ + // Get and Delete Tests + // ------------------------------------------------------------------------ + + @Test + void testGetSnapshotMetadata() throws Exception { + String producerId = "test-manager-get-metadata"; + Map offsets = createTestOffsets(); + + manager.registerSnapshot(producerId, offsets, null); + + Optional snapshot = manager.getSnapshotMetadata(producerId); + assertThat(snapshot).isPresent(); + assertThat(snapshot.get().getTableOffsets()).isNotEmpty(); + } + + @Test + void testGetSnapshotMetadataNonExistent() throws Exception { + Optional snapshot = manager.getSnapshotMetadata("non-existent-producer"); + assertThat(snapshot).isEmpty(); + } + + @Test + void testGetOffsets() throws Exception { + String producerId = "test-manager-get-offsets"; + Map offsets = createTestOffsets(); + + manager.registerSnapshot(producerId, offsets, null); + + Map retrieved = manager.getOffsets(producerId); + assertThat(retrieved).hasSize(offsets.size()); + assertThat(retrieved.get(new TableBucket(1L, 0))).isEqualTo(100L); + } + + @Test + void testGetOffsetsNonExistent() throws Exception { + Map offsets = manager.getOffsets("non-existent-producer"); + assertThat(offsets).isEmpty(); + } + + @Test + void testDeleteSnapshot() throws Exception { + String producerId = "test-manager-delete"; + Map offsets = createTestOffsets(); + + manager.registerSnapshot(producerId, offsets, null); + assertThat(manager.getSnapshotMetadata(producerId)).isPresent(); + + manager.deleteSnapshot(producerId); + + assertThat(manager.getSnapshotMetadata(producerId)).isEmpty(); + assertThat(manager.getOffsets(producerId)).isEmpty(); + } + + @Test + void testDeleteNonExistentSnapshot() throws Exception { + // Should not throw + manager.deleteSnapshot("non-existent-producer"); + } + + // ------------------------------------------------------------------------ + // Cleanup Tests + // ------------------------------------------------------------------------ + + @Test + void testCleanupExpiredSnapshots() throws Exception { + String expiredProducerId = "test-cleanup-expired"; + String validProducerId = "test-cleanup-valid"; + + // Create an expired snapshot + Map offsets = createTestOffsets(); + store.tryStoreSnapshot(expiredProducerId, offsets, System.currentTimeMillis() - 1000); + + // Create a valid snapshot + manager.registerSnapshot(validProducerId, offsets, null); + + // Run cleanup + int cleanedCount = manager.cleanupExpiredSnapshots(); + + assertThat(cleanedCount).isEqualTo(1); + assertThat(manager.getSnapshotMetadata(expiredProducerId)).isEmpty(); + assertThat(manager.getSnapshotMetadata(validProducerId)).isPresent(); + } + + @Test + void testGetDefaultTtlMs() { + assertThat(manager.getDefaultTtlMs()).isEqualTo(DEFAULT_TTL_MS); + } + + // ------------------------------------------------------------------------ + // Concurrent Registration Tests + // ------------------------------------------------------------------------ + + @Test + void testConcurrentRegistrationSameProducer() throws Exception { + String producerId = "test-concurrent-same"; + int numThreads = 10; + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(numThreads); + AtomicInteger createdCount = new AtomicInteger(0); + AtomicInteger existsCount = new AtomicInteger(0); + + for (int i = 0; i < numThreads; i++) { + final int threadId = i; + executor.submit( + () -> { + try { + startLatch.await(); + Map offsets = new HashMap<>(); + offsets.put(new TableBucket(1L, threadId), (long) threadId); + + boolean created = manager.registerSnapshot(producerId, offsets, null); + if (created) { + createdCount.incrementAndGet(); + } else { + existsCount.incrementAndGet(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + doneLatch.countDown(); + } + }); + } + + // Start all threads simultaneously + startLatch.countDown(); + doneLatch.await(30, TimeUnit.SECONDS); + executor.shutdown(); + + // Exactly one should succeed with CREATED + assertThat(createdCount.get()).isEqualTo(1); + assertThat(existsCount.get()).isEqualTo(numThreads - 1); + + // Verify snapshot exists + assertThat(manager.getSnapshotMetadata(producerId)).isPresent(); + } + + @Test + void testConcurrentRegistrationDifferentProducers() throws Exception { + int numProducers = 10; + ExecutorService executor = Executors.newFixedThreadPool(numProducers); + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(numProducers); + AtomicInteger successCount = new AtomicInteger(0); + + for (int i = 0; i < numProducers; i++) { + final int producerNum = i; + executor.submit( + () -> { + try { + startLatch.await(); + String producerId = "test-concurrent-diff-" + producerNum; + Map offsets = new HashMap<>(); + offsets.put(new TableBucket(1L, producerNum), (long) producerNum); + + boolean created = manager.registerSnapshot(producerId, offsets, null); + if (created) { + successCount.incrementAndGet(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + doneLatch.countDown(); + } + }); + } + + // Start all threads simultaneously + startLatch.countDown(); + doneLatch.await(30, TimeUnit.SECONDS); + executor.shutdown(); + + // All should succeed since they're different producers + assertThat(successCount.get()).isEqualTo(numProducers); + + // Verify all snapshots exist + for (int i = 0; i < numProducers; i++) { + assertThat(manager.getSnapshotMetadata("test-concurrent-diff-" + i)).isPresent(); + } + } + + // ------------------------------------------------------------------------ + // Helper Methods + // ------------------------------------------------------------------------ + + private Map createTestOffsets() { + Map offsets = new HashMap<>(); + offsets.put(new TableBucket(1L, 0), 100L); + offsets.put(new TableBucket(1L, 1), 200L); + offsets.put(new TableBucket(2L, 0), 300L); + return offsets; + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/producer/ProducerSnapshotStoreTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/producer/ProducerSnapshotStoreTest.java new file mode 100644 index 0000000000..ebe2898e83 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/producer/ProducerSnapshotStoreTest.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.producer; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.fs.FsPath; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.server.testutils.FlussClusterExtension; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.producer.ProducerSnapshot; +import org.apache.fluss.utils.types.Tuple2; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit tests for {@link ProducerSnapshotStore}. */ +class ProducerSnapshotStoreTest { + + @RegisterExtension + public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION = + FlussClusterExtension.builder() + .setNumOfTabletServers(1) + .setClusterConf(new Configuration()) + .build(); + + @TempDir Path tempDir; + + private ProducerSnapshotStore store; + private ZooKeeperClient zkClient; + + @BeforeEach + void setUp() { + zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(); + store = new ProducerSnapshotStore(zkClient, tempDir.toString()); + } + + @Test + void testTryStoreSnapshotSuccess() throws Exception { + String producerId = "test-producer-store"; + Map offsets = new HashMap<>(); + offsets.put(new TableBucket(1L, 0), 100L); + offsets.put(new TableBucket(1L, 1), 200L); + offsets.put(new TableBucket(2L, 0), 300L); + + long expirationTime = System.currentTimeMillis() + 3600000; + + // First store should succeed + boolean created = store.tryStoreSnapshot(producerId, offsets, expirationTime); + assertThat(created).isTrue(); + + // Verify metadata was stored + Optional snapshot = store.getSnapshotMetadata(producerId); + assertThat(snapshot).isPresent(); + assertThat(snapshot.get().getExpirationTime()).isEqualTo(expirationTime); + assertThat(snapshot.get().getTableOffsets()).hasSize(2); // 2 tables + } + + @Test + void testTryStoreSnapshotAlreadyExists() throws Exception { + String producerId = "test-producer-exists"; + Map offsets1 = new HashMap<>(); + offsets1.put(new TableBucket(1L, 0), 100L); + + Map offsets2 = new HashMap<>(); + offsets2.put(new TableBucket(1L, 0), 999L); + + long expirationTime = System.currentTimeMillis() + 3600000; + + // First store should succeed + assertThat(store.tryStoreSnapshot(producerId, offsets1, expirationTime)).isTrue(); + + // Second store should return false (already exists) + assertThat(store.tryStoreSnapshot(producerId, offsets2, expirationTime)).isFalse(); + + // Original offsets should be preserved + Map retrieved = store.readOffsets(producerId); + assertThat(retrieved.get(new TableBucket(1L, 0))).isEqualTo(100L); + } + + @Test + void testReadOffsets() throws Exception { + String producerId = "test-producer-read"; + Map offsets = new HashMap<>(); + offsets.put(new TableBucket(1L, 0), 100L); + offsets.put(new TableBucket(1L, 1), 200L); + offsets.put(new TableBucket(2L, 0), 300L); + offsets.put(new TableBucket(3L, 100L, 0), 400L); // partitioned table + + long expirationTime = System.currentTimeMillis() + 3600000; + store.tryStoreSnapshot(producerId, offsets, expirationTime); + + // Read offsets back + Map retrieved = store.readOffsets(producerId); + assertThat(retrieved).hasSize(4); + assertThat(retrieved.get(new TableBucket(1L, 0))).isEqualTo(100L); + assertThat(retrieved.get(new TableBucket(1L, 1))).isEqualTo(200L); + assertThat(retrieved.get(new TableBucket(2L, 0))).isEqualTo(300L); + assertThat(retrieved.get(new TableBucket(3L, 100L, 0))).isEqualTo(400L); + } + + @Test + void testReadOffsetsNonExistent() throws Exception { + Map offsets = store.readOffsets("non-existent-producer"); + assertThat(offsets).isEmpty(); + } + + @Test + void testDeleteSnapshot() throws Exception { + String producerId = "test-producer-delete"; + Map offsets = new HashMap<>(); + offsets.put(new TableBucket(1L, 0), 100L); + + long expirationTime = System.currentTimeMillis() + 3600000; + store.tryStoreSnapshot(producerId, offsets, expirationTime); + + // Verify it exists + assertThat(store.getSnapshotMetadata(producerId)).isPresent(); + + // Delete + store.deleteSnapshot(producerId); + + // Verify it's gone + assertThat(store.getSnapshotMetadata(producerId)).isEmpty(); + assertThat(store.readOffsets(producerId)).isEmpty(); + } + + @Test + void testDeleteNonExistentSnapshot() throws Exception { + // Should not throw + store.deleteSnapshot("non-existent-producer"); + } + + @Test + void testListProducerIds() throws Exception { + // Use unique prefix to avoid conflicts with other tests + String prefix = "list-test-" + System.currentTimeMillis() + "-"; + + // Create multiple snapshots + Map offsets = new HashMap<>(); + offsets.put(new TableBucket(1L, 0), 100L); + long expirationTime = System.currentTimeMillis() + 3600000; + + store.tryStoreSnapshot(prefix + "producer-1", offsets, expirationTime); + store.tryStoreSnapshot(prefix + "producer-2", offsets, expirationTime); + store.tryStoreSnapshot(prefix + "producer-3", offsets, expirationTime); + + List producerIds = store.listProducerIds(); + assertThat(producerIds) + .contains(prefix + "producer-1", prefix + "producer-2", prefix + "producer-3"); + } + + @Test + void testGetProducersDirectory() { + FsPath producersDir = store.getProducersDirectory(); + assertThat(producersDir.toString()).isEqualTo(tempDir.toString() + "/producers"); + } + + @Test + void testSnapshotWithPartitionedTable() throws Exception { + String producerId = "test-producer-partitioned"; + Map offsets = new HashMap<>(); + // Non-partitioned table + offsets.put(new TableBucket(1L, 0), 100L); + // Partitioned table with different partitions + offsets.put(new TableBucket(2L, 10L, 0), 200L); + offsets.put(new TableBucket(2L, 10L, 1), 201L); + offsets.put(new TableBucket(2L, 20L, 0), 300L); + + long expirationTime = System.currentTimeMillis() + 3600000; + store.tryStoreSnapshot(producerId, offsets, expirationTime); + + Map retrieved = store.readOffsets(producerId); + assertThat(retrieved).hasSize(4); + assertThat(retrieved.get(new TableBucket(1L, 0))).isEqualTo(100L); + assertThat(retrieved.get(new TableBucket(2L, 10L, 0))).isEqualTo(200L); + assertThat(retrieved.get(new TableBucket(2L, 10L, 1))).isEqualTo(201L); + assertThat(retrieved.get(new TableBucket(2L, 20L, 0))).isEqualTo(300L); + } + + @Test + void testSnapshotExpiration() throws Exception { + String producerId = "test-producer-expiration"; + Map offsets = new HashMap<>(); + offsets.put(new TableBucket(1L, 0), 100L); + + // Create snapshot that expires in the past + long pastExpirationTime = System.currentTimeMillis() - 1000; + store.tryStoreSnapshot(producerId, offsets, pastExpirationTime); + + Optional snapshot = store.getSnapshotMetadata(producerId); + assertThat(snapshot).isPresent(); + assertThat(snapshot.get().isExpired(System.currentTimeMillis())).isTrue(); + } + + @Test + void testEmptyOffsets() throws Exception { + String producerId = "test-producer-empty"; + Map offsets = new HashMap<>(); + + long expirationTime = System.currentTimeMillis() + 3600000; + store.tryStoreSnapshot(producerId, offsets, expirationTime); + + Optional snapshot = store.getSnapshotMetadata(producerId); + assertThat(snapshot).isPresent(); + assertThat(snapshot.get().getTableOffsets()).isEmpty(); + + Map retrieved = store.readOffsets(producerId); + assertThat(retrieved).isEmpty(); + } + + // ------------------------------------------------------------------------ + // Version Control Tests + // ------------------------------------------------------------------------ + + @Test + void testGetSnapshotMetadataWithVersion() throws Exception { + String producerId = "test-producer-version"; + Map offsets = new HashMap<>(); + offsets.put(new TableBucket(1L, 0), 100L); + + long expirationTime = System.currentTimeMillis() + 3600000; + store.tryStoreSnapshot(producerId, offsets, expirationTime); + + // Get with version + Optional> result = + store.getSnapshotMetadataWithVersion(producerId); + + assertThat(result).isPresent(); + assertThat(result.get().f0.getExpirationTime()).isEqualTo(expirationTime); + assertThat(result.get().f1).isGreaterThanOrEqualTo(0); // ZK version starts at 0 + } + + @Test + void testGetSnapshotMetadataWithVersionNonExistent() throws Exception { + Optional> result = + store.getSnapshotMetadataWithVersion("non-existent-producer"); + + assertThat(result).isEmpty(); + } + + @Test + void testDeleteSnapshotIfVersionSuccess() throws Exception { + String producerId = "test-producer-delete-version"; + Map offsets = new HashMap<>(); + offsets.put(new TableBucket(1L, 0), 100L); + + long expirationTime = System.currentTimeMillis() + 3600000; + store.tryStoreSnapshot(producerId, offsets, expirationTime); + + // Get current version + Optional> result = + store.getSnapshotMetadataWithVersion(producerId); + assertThat(result).isPresent(); + int version = result.get().f1; + ProducerSnapshot snapshot = result.get().f0; + + // Delete with correct version should succeed + boolean deleted = store.deleteSnapshotIfVersion(producerId, snapshot, version); + assertThat(deleted).isTrue(); + + // Verify deleted + assertThat(store.getSnapshotMetadata(producerId)).isEmpty(); + } + + @Test + void testDeleteSnapshotIfVersionMismatch() throws Exception { + String producerId = "test-producer-version-mismatch"; + Map offsets = new HashMap<>(); + offsets.put(new TableBucket(1L, 0), 100L); + + long expirationTime = System.currentTimeMillis() + 3600000; + store.tryStoreSnapshot(producerId, offsets, expirationTime); + + // Get current version + Optional> result = + store.getSnapshotMetadataWithVersion(producerId); + assertThat(result).isPresent(); + ProducerSnapshot snapshot = result.get().f0; + + // Delete with wrong version should fail + int wrongVersion = 999; + boolean deleted = store.deleteSnapshotIfVersion(producerId, snapshot, wrongVersion); + assertThat(deleted).isFalse(); + + // Snapshot should still exist + assertThat(store.getSnapshotMetadata(producerId)).isPresent(); + } + + @Test + void testDeleteSnapshotIfVersionAlreadyDeleted() throws Exception { + String producerId = "test-producer-already-deleted"; + Map offsets = new HashMap<>(); + offsets.put(new TableBucket(1L, 0), 100L); + + long expirationTime = System.currentTimeMillis() + 3600000; + store.tryStoreSnapshot(producerId, offsets, expirationTime); + + // Get snapshot info + Optional> result = + store.getSnapshotMetadataWithVersion(producerId); + assertThat(result).isPresent(); + ProducerSnapshot snapshot = result.get().f0; + + // Delete first + store.deleteSnapshot(producerId); + + // Delete again with version should return true (already deleted is success) + boolean deleted = store.deleteSnapshotIfVersion(producerId, snapshot, 0); + assertThat(deleted).isTrue(); + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/zk/data/producer/ProducerSnapshotJsonSerdeTest.java b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/producer/ProducerSnapshotJsonSerdeTest.java new file mode 100644 index 0000000000..77b5c223e6 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/producer/ProducerSnapshotJsonSerdeTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data.producer; + +import org.apache.fluss.fs.FsPath; +import org.apache.fluss.utils.json.JsonSerdeTestBase; + +import java.util.Arrays; +import java.util.Collections; + +/** Tests for {@link ProducerSnapshotJsonSerde}. */ +class ProducerSnapshotJsonSerdeTest extends JsonSerdeTestBase { + + ProducerSnapshotJsonSerdeTest() { + super(ProducerSnapshotJsonSerde.INSTANCE); + } + + @Override + protected ProducerSnapshot[] createObjects() { + // Empty snapshot + ProducerSnapshot empty = new ProducerSnapshot(1735538268000L, Collections.emptyList()); + + // Snapshot with multiple tables and different file system schemes + ProducerSnapshot withTables = + new ProducerSnapshot( + 1735538268000L, + Arrays.asList( + new ProducerSnapshot.TableOffsetMetadata( + 100L, new FsPath("oss://bucket/path/uuid1.offsets")), + new ProducerSnapshot.TableOffsetMetadata( + 200L, new FsPath("s3://bucket/path/uuid2.offsets")))); + + return new ProducerSnapshot[] {empty, withTables}; + } + + @Override + protected String[] expectedJsons() { + return new String[] { + "{\"version\":1,\"expiration_time\":1735538268000,\"tables\":[]}", + "{\"version\":1,\"expiration_time\":1735538268000,\"tables\":[" + + "{\"table_id\":100,\"offsets_path\":\"oss://bucket/path/uuid1.offsets\"}," + + "{\"table_id\":200,\"offsets_path\":\"s3://bucket/path/uuid2.offsets\"}]}" + }; + } +} diff --git a/website/docs/maintenance/configuration.md b/website/docs/maintenance/configuration.md index 44d6e6278d..31880d4d87 100644 --- a/website/docs/maintenance/configuration.md +++ b/website/docs/maintenance/configuration.md @@ -53,9 +53,11 @@ during the Fluss cluster working. ## CoordinatorServer -| Option | Type | Default | Description | -|--------------------------|---------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| coordinator.io-pool.size | Integer | 10 | **Deprecated**: This option is deprecated. Please use `server.io-pool.size` instead. The size of the IO thread pool to run blocking operations for coordinator server. This includes discard unnecessary snapshot files. Increase this value if you experience slow unnecessary snapshot files clean. The default value is 10. | +| Option | Type | Default | Description | +|----------------------------------------------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| coordinator.io-pool.size | Integer | 10 | **Deprecated**: This option is deprecated. Please use `server.io-pool.size` instead. The size of the IO thread pool to run blocking operations for coordinator server. This includes discard unnecessary snapshot files. Increase this value if you experience slow unnecessary snapshot files clean. The default value is 10. | +| coordinator.producer-snapshot.ttl | Duration | 24h | The TTL (time-to-live) for producer offset snapshots. Snapshots older than this TTL will be automatically cleaned up by the coordinator server. Producer offset snapshots are used for undo recovery when a Flink job fails over before completing its first checkpoint. The default value is 24 hours. | +| coordinator.producer-snapshot.cleanup-interval | Duration | 1h | The interval for cleaning up expired producer offset snapshots and orphan files in remote storage. The cleanup task runs periodically to remove expired snapshots and any orphan files that may have been left behind due to incomplete operations. The default value is 1 hour. | ## TabletServer