Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -604,4 +605,65 @@ CompletableFuture<Optional<RebalanceProgress>> listRebalanceProgress(
* NoRebalanceInProgressException} will be thrown.
*/
CompletableFuture<Void> cancelRebalance(@Nullable String rebalanceId);

// ==================================================================================
// Producer Offset Management APIs (for Exactly-Once Semantics)
// ==================================================================================

/**
* Register producer offset snapshot.
*
* <p>This method provides atomic "check and register" semantics:
*
* <ul>
* <li>If snapshot does not exist: create new snapshot and return {@link
* RegisterResult#CREATED}
* <li>If snapshot already exists: do NOT overwrite and return {@link
* RegisterResult#ALREADY_EXISTS}
* </ul>
*
* <p>The atomicity is guaranteed by the server implementation. This enables the caller to
* determine whether undo recovery is needed based on the return value.
*
* <p>The snapshot will be automatically cleaned up after the configured TTL expires.
*
* <p>This API is typically used by Flink Operator Coordinator at job startup to register the
* initial offset snapshot before any data is written.
*
* @param producerId the ID of the producer (typically Flink job ID)
* @param offsets map of TableBucket to offset for all tables
* @return a CompletableFuture containing the registration result indicating whether the
* snapshot was newly created or already existed
* @since 0.9
*/
CompletableFuture<RegisterResult> registerProducerOffsets(
String producerId, Map<TableBucket, Long> offsets);

/**
* Get producer offset snapshot.
*
* <p>This method retrieves the registered offset snapshot for a producer. Returns null if no
* snapshot exists for the given producer ID.
*
* <p>This API is typically used by Flink Operator Coordinator at job startup to check if a
* previous snapshot exists (indicating a failover before first checkpoint).
*
* @param producerId the ID of the producer
* @return a CompletableFuture containing the producer offsets, or null if not found
* @since 0.9
*/
CompletableFuture<ProducerOffsetsResult> getProducerOffsets(String producerId);

/**
* Delete producer offset snapshot.
*
* <p>This method deletes the registered offset snapshot for a producer. This is typically
* called after the first checkpoint completes successfully, as the checkpoint state will be
* used for recovery instead of the initial snapshot.
*
* @param producerId the ID of the producer
* @return a CompletableFuture that completes when deletion succeeds
* @since 0.9
*/
CompletableFuture<Void> deleteProducerOffsets(String producerId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.fluss.rpc.messages.CreateTableRequest;
import org.apache.fluss.rpc.messages.DatabaseExistsRequest;
import org.apache.fluss.rpc.messages.DatabaseExistsResponse;
import org.apache.fluss.rpc.messages.DeleteProducerOffsetsRequest;
import org.apache.fluss.rpc.messages.DescribeClusterConfigsRequest;
import org.apache.fluss.rpc.messages.DropAclsRequest;
import org.apache.fluss.rpc.messages.DropDatabaseRequest;
Expand All @@ -65,6 +66,7 @@
import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataRequest;
import org.apache.fluss.rpc.messages.GetLatestKvSnapshotsRequest;
import org.apache.fluss.rpc.messages.GetLatestLakeSnapshotRequest;
import org.apache.fluss.rpc.messages.GetProducerOffsetsRequest;
import org.apache.fluss.rpc.messages.GetTableInfoRequest;
import org.apache.fluss.rpc.messages.GetTableSchemaRequest;
import org.apache.fluss.rpc.messages.ListAclsRequest;
Expand All @@ -76,11 +78,15 @@
import org.apache.fluss.rpc.messages.ListTablesRequest;
import org.apache.fluss.rpc.messages.ListTablesResponse;
import org.apache.fluss.rpc.messages.PbAlterConfig;
import org.apache.fluss.rpc.messages.PbBucketOffset;
import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket;
import org.apache.fluss.rpc.messages.PbPartitionSpec;
import org.apache.fluss.rpc.messages.PbTableBucketOffset;
import org.apache.fluss.rpc.messages.PbTableOffsets;
import org.apache.fluss.rpc.messages.PbTablePath;
import org.apache.fluss.rpc.messages.RebalanceRequest;
import org.apache.fluss.rpc.messages.RebalanceResponse;
import org.apache.fluss.rpc.messages.RegisterProducerOffsetsRequest;
import org.apache.fluss.rpc.messages.RemoveServerTagRequest;
import org.apache.fluss.rpc.messages.TableExistsRequest;
import org.apache.fluss.rpc.messages.TableExistsResponse;
Expand Down Expand Up @@ -588,6 +594,97 @@ public CompletableFuture<Void> cancelRebalance(@Nullable String rebalanceId) {
return gateway.cancelRebalance(request).thenApply(r -> null);
}

// ==================================================================================
// Producer Offset Management APIs (for Exactly-Once Semantics)
// ==================================================================================

@Override
public CompletableFuture<RegisterResult> registerProducerOffsets(
String producerId, Map<TableBucket, Long> offsets) {
checkNotNull(producerId, "producerId must not be null");
checkNotNull(offsets, "offsets must not be null");

RegisterProducerOffsetsRequest request = new RegisterProducerOffsetsRequest();
request.setProducerId(producerId);

// Convert TableBucket offsets to PbTableBucketOffset
for (Map.Entry<TableBucket, Long> entry : offsets.entrySet()) {
TableBucket bucket = entry.getKey();
PbTableBucketOffset pbOffset =
request.addBucketOffset()
.setTableId(bucket.getTableId())
.setBucketId(bucket.getBucket())
.setOffset(entry.getValue());
if (bucket.getPartitionId() != null) {
pbOffset.setPartitionId(bucket.getPartitionId());
}
}

return gateway.registerProducerOffsets(request)
.thenApply(
response -> {
// result: 0 = CREATED, 1 = ALREADY_EXISTS
int result = response.hasResult() ? response.getResult() : 0;
return result == 0
? RegisterResult.CREATED
: RegisterResult.ALREADY_EXISTS;
});
}

@Override
public CompletableFuture<ProducerOffsetsResult> getProducerOffsets(String producerId) {
checkNotNull(producerId, "producerId must not be null");

GetProducerOffsetsRequest request = new GetProducerOffsetsRequest();
request.setProducerId(producerId);

return gateway.getProducerOffsets(request)
.thenApply(
response -> {
if (!response.hasProducerId()) {
// No snapshot found for this producer
return null;
}

Map<Long, Map<TableBucket, Long>> tableOffsets = new HashMap<>();
for (PbTableOffsets pbTableOffsets : response.getTableOffsetsList()) {
long tableId = pbTableOffsets.getTableId();
Map<TableBucket, Long> bucketOffsets = new HashMap<>();

for (PbBucketOffset pbBucketOffset :
pbTableOffsets.getBucketOffsetsList()) {
Long partitionId =
pbBucketOffset.hasPartitionId()
? pbBucketOffset.getPartitionId()
: null;
TableBucket bucket =
new TableBucket(
tableId,
partitionId,
pbBucketOffset.getBucketId());
bucketOffsets.put(bucket, pbBucketOffset.getLogEndOffset());
}

tableOffsets.put(tableId, bucketOffsets);
}

long expirationTime =
response.hasExpirationTime() ? response.getExpirationTime() : 0;
return new ProducerOffsetsResult(
response.getProducerId(), tableOffsets, expirationTime);
});
}

@Override
public CompletableFuture<Void> deleteProducerOffsets(String producerId) {
checkNotNull(producerId, "producerId must not be null");

DeleteProducerOffsetsRequest request = new DeleteProducerOffsetsRequest();
request.setProducerId(producerId);

return gateway.deleteProducerOffsets(request).thenApply(r -> null);
}

@Override
public void close() {
// nothing to do yet
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.client.admin;

import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.metadata.TableBucket;

import java.util.Collections;
import java.util.Map;

/**
* Result containing producer offset snapshot data.
*
* <p>This class holds the offset snapshot for a producer, which is used for undo recovery when a
* Flink job fails over before completing its first checkpoint.
*
* <p>The snapshot contains bucket offsets organized by table ID, allowing the Flink Operator
* Coordinator to coordinate undo recovery across all subtasks.
*
* @since 0.9
*/
@PublicEvolving
public class ProducerOffsetsResult {

private final String producerId;
private final Map<Long, Map<TableBucket, Long>> tableOffsets;
private final long expirationTime;

/**
* Creates a new ProducerOffsetsResult.
*
* @param producerId the producer ID (typically Flink job ID)
* @param tableOffsets map of table ID to bucket offsets
* @param expirationTime the expiration timestamp in milliseconds
*/
public ProducerOffsetsResult(
String producerId,
Map<Long, Map<TableBucket, Long>> tableOffsets,
long expirationTime) {
this.producerId = producerId;
this.tableOffsets = Collections.unmodifiableMap(tableOffsets);
this.expirationTime = expirationTime;
}

/**
* Get the producer ID.
*
* @return the producer ID
*/
public String getProducerId() {
return producerId;
}

/**
* Get the offset snapshot for all tables.
*
* @return unmodifiable map of table ID to bucket offsets
*/
public Map<Long, Map<TableBucket, Long>> getTableOffsets() {
return tableOffsets;
}

/**
* Get the expiration timestamp.
*
* @return the expiration timestamp in milliseconds since epoch
*/
public long getExpirationTime() {
return expirationTime;
}

@Override
public String toString() {
return "ProducerOffsetsResult{"
+ "producerId='"
+ producerId
+ '\''
+ ", tableCount="
+ tableOffsets.size()
+ ", expirationTime="
+ expirationTime
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.client.admin;

import org.apache.fluss.annotation.PublicEvolving;

/**
* Result of producer offset registration.
*
* <p>This enum indicates whether a producer offset snapshot was newly created or already existed
* when calling {@link Admin#registerProducerOffsets}.
*
* @since 0.9
*/
@PublicEvolving
public enum RegisterResult {
/**
* Snapshot was newly created.
*
* <p>This indicates a first startup scenario where no previous snapshot existed. The caller
* does not need to perform undo recovery.
*/
CREATED,

/**
* Snapshot already existed and was not overwritten.
*
* <p>This indicates a failover scenario where a previous snapshot exists. The caller should
* perform undo recovery using the existing snapshot offsets.
*/
ALREADY_EXISTS
}
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,31 @@ public class ConfigOptions {
+ "The default value is 10. "
+ "This option is deprecated. Please use server.io-pool.size instead.");

/**
* The TTL (time-to-live) for producer offset snapshots. Snapshots older than this TTL will be
* automatically cleaned up by the coordinator server.
*/
public static final ConfigOption<Duration> COORDINATOR_PRODUCER_SNAPSHOT_TTL =
key("coordinator.producer-snapshot.ttl")
.durationType()
.defaultValue(Duration.ofHours(24))
.withDescription(
"The TTL (time-to-live) for producer offset snapshots. "
+ "Snapshots older than this TTL will be automatically cleaned up "
+ "by the coordinator server. Default is 24 hours.");

/**
* The interval for cleaning up expired producer offset snapshots and orphan files in remote
* storage.
*/
public static final ConfigOption<Duration> COORDINATOR_PRODUCER_SNAPSHOT_CLEANUP_INTERVAL =
key("coordinator.producer-snapshot.cleanup-interval")
.durationType()
.defaultValue(Duration.ofHours(1))
.withDescription(
"The interval for cleaning up expired producer offset snapshots "
+ "and orphan files in remote storage. Default is 1 hour.");

// ------------------------------------------------------------------------
// ConfigOptions for Tablet Server
// ------------------------------------------------------------------------
Expand Down
Loading