diff --git a/fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeWriter.java b/fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeWriter.java index c3ab3a352a..b8b004f601 100644 --- a/fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeWriter.java +++ b/fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeWriter.java @@ -20,6 +20,8 @@ import org.apache.fluss.annotation.PublicEvolving; import org.apache.fluss.record.LogRecord; +import javax.annotation.Nullable; + import java.io.Closeable; import java.io.IOException; @@ -43,8 +45,9 @@ public interface LakeWriter extends Closeable { /** * Completes the writing process and returns the write result. * - * @return the write result + * @return the write result, or null if no data was written (empty write scenario) * @throws IOException if an I/O error occurs */ + @Nullable WriteResult complete() throws IOException; } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java index 2f3a69a7ea..33c65a0c8c 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java @@ -151,6 +151,17 @@ public void processElement(StreamRecord> str collectTableAllBucketWriteResult(tableId); if (committableWriteResults != null) { + // Check if any result is cancelled (table was dropped) + boolean isCancelled = + committableWriteResults.stream().anyMatch(TableBucketWriteResult::isCancelled); + if (isCancelled) { + LOG.info( + "Skipping commit for dropped table {}, table path {}.", + tableId, + tableBucketWriteResult.tablePath()); + collectedTableBucketWriteResults.remove(tableId); + return; + } try { CommitResult commitResult = commitWriteResults( diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringTableDroppedEvent.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringTableDroppedEvent.java new file mode 100644 index 0000000000..656810288b --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringTableDroppedEvent.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering.event; + +import org.apache.flink.api.connector.source.SourceEvent; + +import java.util.Objects; + +/** + * SourceEvent used to notify TieringSourceReader that a table has been dropped or recreated, and + * all pending splits for this table should be skipped. + */ +public class TieringTableDroppedEvent implements SourceEvent { + + private static final long serialVersionUID = 1L; + + private final long tableId; + + public TieringTableDroppedEvent(long tableId) { + this.tableId = tableId; + } + + public long getTableId() { + return tableId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof TieringTableDroppedEvent)) { + return false; + } + TieringTableDroppedEvent that = (TieringTableDroppedEvent) o; + return tableId == that.tableId; + } + + @Override + public int hashCode() { + return Objects.hashCode(tableId); + } + + @Override + public String toString() { + return "TieringTableDroppedEvent{" + "tableId=" + tableId + '}'; + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResult.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResult.java index abec3c6c21..72378ad70f 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResult.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResult.java @@ -30,6 +30,11 @@ * that the write result is for, the end log offset of tiering, the total number of write results in * one round of tiering. It'll be passed to downstream committer operator to collect all the write * results of a table and do commit. + * + *

When {@code cancelled} is {@code true}, it indicates this result originates from a tiering + * round that was aborted (e.g., the table was dropped). In this case the {@link WriteResult} will + * always be {@code null} and the downstream committer should skip the commit and instead report the + * cancellation back to the coordinator. */ public class TableBucketWriteResult implements Serializable { @@ -57,6 +62,9 @@ public class TableBucketWriteResult implements Serializable { // for the round of tiering is finished private final int numberOfWriteResults; + // indicates whether this result is from a cancelled tiering (e.g., table was dropped) + private final boolean cancelled; + public TableBucketWriteResult( TablePath tablePath, TableBucket tableBucket, @@ -64,7 +72,8 @@ public TableBucketWriteResult( @Nullable WriteResult writeResult, long logEndOffset, long maxTimestamp, - int numberOfWriteResults) { + int numberOfWriteResults, + boolean cancelled) { this.tablePath = tablePath; this.tableBucket = tableBucket; this.partitionName = partitionName; @@ -72,6 +81,7 @@ public TableBucketWriteResult( this.logEndOffset = logEndOffset; this.maxTimestamp = maxTimestamp; this.numberOfWriteResults = numberOfWriteResults; + this.cancelled = cancelled; } public TablePath tablePath() { @@ -103,4 +113,8 @@ public long logEndOffset() { public long maxTimestamp() { return maxTimestamp; } + + public boolean isCancelled() { + return cancelled; + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java index 3651760955..31fb0e94a3 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java @@ -33,7 +33,10 @@ public class TableBucketWriteResultSerializer private static final ThreadLocal SERIALIZER_CACHE = ThreadLocal.withInitial(() -> new DataOutputSerializer(64)); - private static final int CURRENT_VERSION = 1; + private static final int CURRENT_VERSION = 2; + + // Version 1: original format without the cancelled flag + private static final int VERSION_1 = 1; private final org.apache.fluss.lake.serializer.SimpleVersionedSerializer writeResultSerializer; @@ -91,6 +94,9 @@ public byte[] serialize(TableBucketWriteResult tableBucketWriteResu // serialize number of write results out.writeInt(tableBucketWriteResult.numberOfWriteResults()); + // serialize cancelled flag + out.writeBoolean(tableBucketWriteResult.isCancelled()); + final byte[] result = out.getCopyOfBuffer(); out.clear(); return result; @@ -99,7 +105,7 @@ public byte[] serialize(TableBucketWriteResult tableBucketWriteResu @Override public TableBucketWriteResult deserialize(int version, byte[] serialized) throws IOException { - if (version != CURRENT_VERSION) { + if (version != CURRENT_VERSION && version != VERSION_1) { throw new IOException("Unknown version " + version); } final DataInputDeserializer in = new DataInputDeserializer(serialized); @@ -125,7 +131,9 @@ public TableBucketWriteResult deserialize(int version, byte[] seria if (writeResultLength >= 0) { byte[] writeResultBytes = new byte[writeResultLength]; in.readFully(writeResultBytes); - writeResult = writeResultSerializer.deserialize(version, writeResultBytes); + writeResult = + writeResultSerializer.deserialize( + writeResultSerializer.getVersion(), writeResultBytes); } else { writeResult = null; } @@ -136,6 +144,10 @@ public TableBucketWriteResult deserialize(int version, byte[] seria long maxTimestamp = in.readLong(); // deserialize number of write results int numberOfWriteResults = in.readInt(); + + // deserialize cancelled flag (added in version 2; default to false for version 1) + boolean cancelled = (version >= CURRENT_VERSION) && in.readBoolean(); + return new TableBucketWriteResult<>( tablePath, tableBucket, @@ -143,6 +155,7 @@ public TableBucketWriteResult deserialize(int version, byte[] seria writeResult, logEndOffset, maxTimestamp, - numberOfWriteResults); + numberOfWriteResults, + cancelled); } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java index ac72aad664..95de907576 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java @@ -55,41 +55,61 @@ public TieringSourceFetcherManager( } public void markTableReachTieringMaxDuration(long tableId) { + LOG.info("Enqueueing handleTableReachTieringMaxDuration task for table {}", tableId); + enqueueTaskForTable( + tableId, + reader -> { + LOG.debug( + "Executing handleTableReachTieringMaxDuration in split reader for table {}", + tableId); + reader.handleTableReachTieringMaxDuration(tableId); + }, + "handleTableReachTieringMaxDuration"); + } + + public void markTableDropped(long tableId) { + LOG.info("Enqueueing handleTableDropped task for table {}", tableId); + enqueueTaskForTable( + tableId, + reader -> { + LOG.debug("Executing handleTableDropped in split reader for table {}", tableId); + reader.handleTableDropped(tableId); + }, + "handleTableDropped"); + } + + private void enqueueTaskForTable( + long tableId, Consumer> action, String actionDesc) { + SplitFetcher, TieringSplit> splitFetcher; if (!fetchers.isEmpty()) { - // The fetcher thread is still running. This should be the majority of the cases. - LOG.info("fetchers is not empty, marking tiering max duration for table {}", tableId); - fetchers.values() - .forEach( - splitFetcher -> - enqueueMarkTableReachTieringMaxDurationTask( - splitFetcher, tableId)); + LOG.info("Fetchers are active, enqueueing {} task for table {}", actionDesc, tableId); + fetchers.values().forEach(f -> enqueueReaderTask(f, action)); } else { - SplitFetcher, TieringSplit> splitFetcher = - createSplitFetcher(); LOG.info( - "fetchers is empty, enqueue marking tiering max duration for table {}", + "No active fetchers, creating new fetcher and enqueueing {} task for table {}", + actionDesc, tableId); - enqueueMarkTableReachTieringMaxDurationTask(splitFetcher, tableId); + splitFetcher = createSplitFetcher(); + enqueueReaderTask(splitFetcher, action); startFetcher(splitFetcher); } } - private void enqueueMarkTableReachTieringMaxDurationTask( + @SuppressWarnings("unchecked") + private void enqueueReaderTask( SplitFetcher, TieringSplit> splitFetcher, - long reachTieringDeadlineTable) { + Consumer> action) { splitFetcher.enqueueTask( new SplitFetcherTask() { @Override public boolean run() { - ((TieringSplitReader) splitFetcher.getSplitReader()) - .handleTableReachTieringMaxDuration(reachTieringDeadlineTable); + action.accept( + (TieringSplitReader) splitFetcher.getSplitReader()); return true; } @Override - public void wakeUp() { - // do nothing - } + public void wakeUp() {} }); } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java index eae584efed..562708628c 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java @@ -22,6 +22,7 @@ import org.apache.fluss.client.Connection; import org.apache.fluss.flink.adapter.SingleThreadMultiplexSourceReaderBaseAdapter; import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent; +import org.apache.fluss.flink.tiering.event.TieringTableDroppedEvent; import org.apache.fluss.flink.tiering.source.split.TieringSplit; import org.apache.fluss.flink.tiering.source.state.TieringSplitState; import org.apache.fluss.lake.writer.LakeTieringFactory; @@ -125,9 +126,15 @@ public void handleSourceEvents(SourceEvent sourceEvent) { TieringReachMaxDurationEvent reachMaxDurationEvent = (TieringReachMaxDurationEvent) sourceEvent; long tableId = reachMaxDurationEvent.getTableId(); - LOG.info("Received reach max duration for table {}", tableId); + LOG.info("Received reach max duration event for table {}", tableId); ((TieringSourceFetcherManager) splitFetcherManager) .markTableReachTieringMaxDuration(tableId); + } else if (sourceEvent instanceof TieringTableDroppedEvent) { + TieringTableDroppedEvent tableDroppedEvent = (TieringTableDroppedEvent) sourceEvent; + long tableId = tableDroppedEvent.getTableId(); + LOG.info("Received table dropped event for table {}", tableId); + ((TieringSourceFetcherManager) splitFetcherManager) + .markTableDropped(tableId); } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java index 1e530d69a3..ad392e9a86 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java @@ -84,6 +84,7 @@ public class TieringSplitReader private final Map> pendingTieringSplits; private final Set reachTieringMaxDurationTables; + private final Set droppedTables; private final Map> lakeWriters; private final Connection connection; @@ -128,6 +129,7 @@ protected TieringSplitReader( this.lakeWriters = new HashMap<>(); this.currentPendingSnapshotSplits = new ArrayDeque<>(); this.reachTieringMaxDurationTables = new HashSet<>(); + this.droppedTables = new HashSet<>(); this.pollTimeout = pollTimeout; } @@ -143,6 +145,13 @@ public RecordsWithSplitIds> fetch() throws I currentEmptySplits.clear(); return records; } + + // Check droppedTables BEFORE checkSplitOrStartNext to quickly respond to already-marked + // current table + if (currentTableId != null && droppedTables.contains(currentTableId)) { + return forceCompleteDroppedTable(); + } + checkSplitOrStartNext(); // may read snapshot firstly @@ -251,6 +260,25 @@ private void checkSplitOrStartNext() { } Set pendingSplits = pendingTieringSplits.remove(pendingTableId); + + // If the pending table is already dropped, set minimal table state from split metadata + // without calling getOrMoveToTable() to avoid RPC exception (TableNotExistException). + // The next fetch() cycle will detect the dropped flag and call forceCompleteDroppedTable(). + if (droppedTables.contains(pendingTableId)) { + TieringSplit firstSplit = pendingSplits.iterator().next(); + currentTableId = pendingTableId; + currentTablePath = firstSplit.getTablePath(); + currentTableNumberOfSplits = firstSplit.getNumberOfSplits(); + for (TieringSplit split : pendingSplits) { + currentTableSplitsByBucket.put(split.getTableBucket(), split); + } + LOG.info( + "Skipping RPC for dropped table {} (path: {}), will force complete in next fetch cycle.", + pendingTableId, + currentTablePath); + return; + } + for (TieringSplit split : pendingSplits) { getOrMoveToTable(split); addSplitToCurrentTable(split); @@ -445,7 +473,8 @@ private TableBucketWriteResult completeLakeWriter( writeResult, logEndOffset, maxTimestamp, - checkNotNull(currentTableNumberOfSplits)); + checkNotNull(currentTableNumberOfSplits), + false); } private TableBucketWriteResultWithSplitIds forEmptySplits(Set emptySplits) { @@ -463,7 +492,8 @@ private TableBucketWriteResultWithSplitIds forEmptySplits(Set empt null, UNKNOWN_BUCKET_OFFSET, UNKNOWN_BUCKET_TIMESTAMP, - tieringSplit.getNumberOfSplits())); + tieringSplit.getNumberOfSplits(), + false)); } return new TableBucketWriteResultWithSplitIds(writeResults, finishedSplitIds); } @@ -544,6 +574,7 @@ private void finishCurrentTable() throws IOException { throw new IOException("Fail to finish current table.", e); } reachTieringMaxDurationTables.remove(currentTableId); + droppedTables.remove(currentTableId); // before switch to a new table, mark all as empty or null currentTableId = null; currentTablePath = null; @@ -570,6 +601,116 @@ public void handleTableReachTieringMaxDuration(long tableId) { } } + /** + * Handle a table being dropped. This will mark the table as dropped, and it will be force + * completed with empty results in the next fetch cycle. + * + *

For the currently active table, the dropped flag is set so that {@link #fetch()} detects + * it at the start of the next cycle and calls {@link #forceCompleteDroppedTable()}. For tables + * in {@code pendingTieringSplits}, the flag is also set here; those splits will be skipped when + * they become the active table and the dropped flag is detected. + * + * @param tableId the id of the dropped table + */ + public void handleTableDropped(long tableId) { + LOG.info( + "handleTableDropped, tableId: {}, currentTableId: {}, pendingTieringSplits: {}", + tableId, + currentTableId, + pendingTieringSplits); + if ((currentTableId != null && currentTableId.equals(tableId)) + || pendingTieringSplits.containsKey(tableId)) { + // Current table is being dropped, mark it for force completion in next fetch + LOG.info("Table {} is dropped, will force to complete with empty results.", tableId); + droppedTables.add(tableId); + } + } + + /** + * Force complete tiering for a dropped table. This will close any in-progress lake writers + * without completing (discarding uncommitted data), then finish all remaining splits with null + * write results. + */ + private RecordsWithSplitIds> forceCompleteDroppedTable() + throws IOException { + LOG.info("Force completing dropped table {}", currentTableId); + + Map> writeResults = new HashMap<>(); + Map finishedSplitIds = new HashMap<>(); + + // Generate empty results for all splits (both log and snapshot) + Iterator> splitsIterator = + currentTableSplitsByBucket.entrySet().iterator(); + while (splitsIterator.hasNext()) { + Map.Entry entry = splitsIterator.next(); + TableBucket bucket = entry.getKey(); + TieringSplit split = entry.getValue(); + if (split != null) { + // Close lake writer without complete - discard data for dropped table + LakeWriter lakeWriter = lakeWriters.remove(bucket); + if (lakeWriter != null) { + try { + lakeWriter.close(); + } catch (Exception e) { + LOG.warn("Failed to close lake writer for bucket {}", bucket, e); + } + } + + TableBucketWriteResult bucketResult = + toTableBucketWriteResult( + split.getTablePath(), + bucket, + split.getPartitionName(), + null, + UNKNOWN_BUCKET_OFFSET, + UNKNOWN_BUCKET_TIMESTAMP, + checkNotNull(currentTableNumberOfSplits), + true); + writeResults.put(bucket, bucketResult); + finishedSplitIds.put(bucket, split.splitId()); + LOG.info( + "Split {} is forced to be finished due to table dropped with empty result.", + split.splitId()); + splitsIterator.remove(); + } + } + + // Close any remaining lake writers that don't have corresponding splits + for (Map.Entry> entry : lakeWriters.entrySet()) { + try { + entry.getValue().close(); + } catch (Exception e) { + LOG.warn("Failed to close orphan lake writer for bucket {}", entry.getKey(), e); + } + } + lakeWriters.clear(); + + // Also handle pending snapshot splits for this table + while (!currentPendingSnapshotSplits.isEmpty()) { + TieringSnapshotSplit snapshotSplit = currentPendingSnapshotSplits.poll(); + TableBucket bucket = snapshotSplit.getTableBucket(); + TableBucketWriteResult emptyResult = + toTableBucketWriteResult( + snapshotSplit.getTablePath(), + bucket, + snapshotSplit.getPartitionName(), + null, + UNKNOWN_BUCKET_OFFSET, + UNKNOWN_BUCKET_TIMESTAMP, + checkNotNull(currentTableNumberOfSplits), + true); + writeResults.put(bucket, emptyResult); + finishedSplitIds.put(bucket, snapshotSplit.splitId()); + LOG.info( + "Pending snapshot split {} is forced to be finished due to table dropped.", + snapshotSplit.splitId()); + } + + // Note: droppedTables.remove is handled by finishCurrentTable() + finishCurrentTable(); + return new TableBucketWriteResultWithSplitIds(writeResults, finishedSplitIds); + } + @Override public void wakeUp() { if (currentLogScanner != null) { @@ -625,7 +766,8 @@ private TableBucketWriteResult toTableBucketWriteResult( @Nullable WriteResult writeResult, long endLogOffset, long maxTimestamp, - int numberOfSplits) { + int numberOfSplits, + boolean cancelled) { return new TableBucketWriteResult<>( tablePath, tableBucket, @@ -633,7 +775,8 @@ private TableBucketWriteResult toTableBucketWriteResult( writeResult, endLogOffset, maxTimestamp, - numberOfSplits); + numberOfSplits, + cancelled); } private class TableBucketWriteResultWithSplitIds diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java index 337222f4e3..cf59e917b6 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java @@ -27,6 +27,7 @@ import org.apache.fluss.flink.tiering.event.FailedTieringEvent; import org.apache.fluss.flink.tiering.event.FinishedTieringEvent; import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent; +import org.apache.fluss.flink.tiering.event.TieringTableDroppedEvent; import org.apache.fluss.flink.tiering.source.split.TieringSplit; import org.apache.fluss.flink.tiering.source.split.TieringSplitGenerator; import org.apache.fluss.flink.tiering.source.state.TieringSourceEnumeratorState; @@ -39,9 +40,12 @@ import org.apache.fluss.rpc.messages.LakeTieringHeartbeatRequest; import org.apache.fluss.rpc.messages.LakeTieringHeartbeatResponse; import org.apache.fluss.rpc.messages.PbHeartbeatReqForTable; +import org.apache.fluss.rpc.messages.PbHeartbeatRespForTable; import org.apache.fluss.rpc.messages.PbLakeTieringStats; import org.apache.fluss.rpc.messages.PbLakeTieringTableInfo; import org.apache.fluss.rpc.metrics.ClientMetricGroup; +import org.apache.fluss.rpc.protocol.ApiError; +import org.apache.fluss.rpc.protocol.Errors; import org.apache.fluss.utils.MapUtils; import org.apache.flink.api.connector.source.ReaderInfo; @@ -407,8 +411,9 @@ private void assignSplits() { currentFailedTableEpochs, tieringTableEpochs); + LakeTieringHeartbeatResponse heartbeatResponse; if (pendingSplits.isEmpty() && !readersAwaitingSplit.isEmpty()) { - LakeTieringHeartbeatResponse heartbeatResponse = + heartbeatResponse = waitHeartbeatResponse( coordinatorGateway.lakeTieringHeartbeat( heartBeatWithRequestNewTieringTable(tieringHeartbeatRequest))); @@ -427,9 +432,14 @@ private void assignSplits() { } } else { // report heartbeat to fluss coordinator - waitHeartbeatResponse(coordinatorGateway.lakeTieringHeartbeat(tieringHeartbeatRequest)); + heartbeatResponse = + waitHeartbeatResponse( + coordinatorGateway.lakeTieringHeartbeat(tieringHeartbeatRequest)); } + // Process tiering_table_resp to detect table deletion errors + handleTieringTableResponseErrors(heartbeatResponse); + // if come to here, we can remove currentFinishedTables/failedTableEpochs to avoid send // in next round currentFinishedTables.forEach(finishedTables::remove); @@ -437,6 +447,61 @@ private void assignSplits() { return lakeTieringInfo; } + /** + * Handle errors in tiering_table_resp from heartbeat response. If a table has been dropped, + * mark it as failed and notify readers to skip processing. + */ + private void handleTieringTableResponseErrors(LakeTieringHeartbeatResponse heartbeatResponse) { + for (PbHeartbeatRespForTable resp : heartbeatResponse.getTieringTableRespsList()) { + if (resp.hasError()) { + ApiError error = ApiError.fromErrorMessage(resp.getError()); + Errors errors = error.error(); + // Check if the error indicates table doesn't exist or tiering epoch is fenced + // (which happens when table is dropped and recreated) + if (errors == Errors.TABLE_NOT_EXIST + || errors == Errors.UNKNOWN_TABLE_OR_BUCKET_EXCEPTION + || errors == Errors.FENCED_TIERING_EPOCH_EXCEPTION) { + long tableId = resp.getTableId(); + LOG.warn( + "Table {} is dropped or epoch mismatch (error: {}), canceling tiering.", + tableId, + errors); + handleTableDropped(tableId); + } + } + } + } + + /** + * Handle a dropped table by marking all related splits to skip, removing from tiering epochs, + * and notifying readers. + */ + @VisibleForTesting + protected void handleTableDropped(long tableId) { + // Remove from tiering table epochs + Long tieringEpoch = tieringTableEpochs.remove(tableId); + + // Mark all pending splits for this table to skip current round + for (TieringSplit tieringSplit : pendingSplits) { + if (tieringSplit.getTableBucket().getTableId() == tableId) { + tieringSplit.skipCurrentRound(); + } + } + + // Add to failed table epochs to notify coordinator in next heartbeat + if (tieringEpoch != null) { + failedTableEpochs.put(tableId, tieringEpoch); + } + + // Broadcast the event to all readers to stop processing this table + Set readers = new HashSet<>(context.registeredReaders().keySet()); + for (int reader : readers) { + TieringTableDroppedEvent event = new TieringTableDroppedEvent(tableId); + LOG.info("Send {} to reader {} for dropped table", event, reader); + context.sendEventToSourceReader(reader, event); + } + } + private void generateTieringSplits(Tuple3 tieringTable) throws FlinkRuntimeException { if (tieringTable == null) { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java index 70d86bb2c8..06f8895050 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java @@ -391,7 +391,8 @@ void testPartitionedTableCommitWhenFlussMissingLakeSnapshot() throws Exception { writeResult, logEndOffset, maxTimestamp, - numberOfWriteResults); + numberOfWriteResults, + false); } private StreamRecord> @@ -403,6 +404,27 @@ void testPartitionedTableCommitWhenFlussMissingLakeSnapshot() throws Exception { long logEndOffset, long maxTimestamp, int numberOfWriteResults) { + return createTableBucketWriteResultStreamRecord( + tablePath, + tableBucket, + partitionName, + writeResult, + logEndOffset, + maxTimestamp, + numberOfWriteResults, + false); + } + + private StreamRecord> + createTableBucketWriteResultStreamRecord( + TablePath tablePath, + TableBucket tableBucket, + @Nullable String partitionName, + @Nullable Integer writeResult, + long logEndOffset, + long maxTimestamp, + int numberOfWriteResults, + boolean cancelled) { TableBucketWriteResult tableBucketWriteResult = new TableBucketWriteResult<>( tablePath, @@ -411,7 +433,8 @@ void testPartitionedTableCommitWhenFlussMissingLakeSnapshot() throws Exception { writeResult == null ? null : new TestingWriteResult(writeResult), logEndOffset, maxTimestamp, - numberOfWriteResults); + numberOfWriteResults, + cancelled); return new StreamRecord<>(tableBucketWriteResult); } @@ -505,6 +528,38 @@ void testCommitFailsWhenTableRecreated() throws Exception { .contains("dropped and recreated during tiering"); } + @Test + void testCommitSkippedWhenTableDropped() throws Exception { + TablePath tablePath = TablePath.of("fluss", "test_commit_skipped_when_table_dropped"); + long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR); + int numberOfWriteResults = 3; + + // Record the number of events before processing + int eventCountBefore = mockOperatorEventGateway.getEventsSent().size(); + + // Send all write results with cancelled=true + for (int bucket = 0; bucket < numberOfWriteResults; bucket++) { + TableBucket tableBucket = new TableBucket(tableId, bucket); + committerOperator.processElement( + createTableBucketWriteResultStreamRecord( + tablePath, + tableBucket, + null, // partitionName + bucket, // writeResult + bucket, // logEndOffset + (long) bucket, // maxTimestamp + numberOfWriteResults, + true)); // cancelled=true + } + + // Verify no lake snapshot was created + verifyNoLakeSnapshot(tablePath); + + // Verify no FinishedTieringEvent or FailedTieringEvent was sent + int eventCountAfter = mockOperatorEventGateway.getEventsSent().size(); + assertThat(eventCountAfter).isEqualTo(eventCountBefore); + } + private CommittedLakeSnapshot mockCommittedLakeSnapshot( long tableId, TablePath tablePath, int snapshotId, Map logEndOffsets) throws Exception { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializerTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializerTest.java index dbb40eae17..4521eff339 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializerTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializerTest.java @@ -21,6 +21,8 @@ import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -42,37 +44,82 @@ void testSerializeAndDeserialize(boolean isPartitioned) throws Exception { TableBucket tableBucket = isPartitioned ? new TableBucket(1, 1000L, 2) : new TableBucket(1, 2); String partitionName = isPartitioned ? "partition1" : null; - TableBucketWriteResult tableBucketWriteResult = - new TableBucketWriteResult<>( - tablePath, tableBucket, partitionName, testingWriteResult, 10, 30L, 20); - // test serialize and deserialize - byte[] serialized = tableBucketWriteResultSerializer.serialize(tableBucketWriteResult); - TableBucketWriteResult deserialized = - tableBucketWriteResultSerializer.deserialize( - tableBucketWriteResultSerializer.getVersion(), serialized); - - assertThat(deserialized.tablePath()).isEqualTo(tablePath); - assertThat(deserialized.tableBucket()).isEqualTo(tableBucket); - assertThat(deserialized.partitionName()).isEqualTo(partitionName); - TestingWriteResult deserializedWriteResult = deserialized.writeResult(); - assertThat(deserializedWriteResult).isNotNull(); - assertThat(deserializedWriteResult.getWriteResult()) + TableBucketWriteResult result = + serializeAndDeserialize( + new TableBucketWriteResult<>( + tablePath, + tableBucket, + partitionName, + testingWriteResult, + 10, + 30L, + 20, + false)); + assertThat(result.tablePath()).isEqualTo(tablePath); + assertThat(result.tableBucket()).isEqualTo(tableBucket); + assertThat(result.partitionName()).isEqualTo(partitionName); + assertThat(result.writeResult().getWriteResult()) .isEqualTo(testingWriteResult.getWriteResult()); - assertThat(deserialized.numberOfWriteResults()).isEqualTo(20); + assertThat(result.isCancelled()).isFalse(); + + // verify when writeResult is null, cancelled=true + result = + serializeAndDeserialize( + new TableBucketWriteResult<>( + tablePath, tableBucket, partitionName, null, 20, 30L, 30, true)); + assertThat(result.writeResult()).isNull(); + assertThat(result.isCancelled()).isTrue(); + } + + private TableBucketWriteResult serializeAndDeserialize( + TableBucketWriteResult input) throws Exception { + byte[] serialized = tableBucketWriteResultSerializer.serialize(input); + return tableBucketWriteResultSerializer.deserialize( + tableBucketWriteResultSerializer.getVersion(), serialized); + } + + @Test + void testDeserializeVersion1IsBackwardCompatible() throws Exception { + // Manually construct a version-1 payload (no cancelled flag) and verify + // that it deserializes correctly with cancelled defaulting to false. + TablePath tablePath = TablePath.of("db1", "tb1"); + TableBucket tableBucket = new TableBucket(1, 2); + TestingWriteResult testingWriteResult = new TestingWriteResult(42); + TestingWriteResultSerializer writeResultSerializer = new TestingWriteResultSerializer(); + + DataOutputSerializer out = new DataOutputSerializer(64); + // table path + out.writeUTF(tablePath.getDatabaseName()); + out.writeUTF(tablePath.getTableName()); + // bucket (no partition) + out.writeLong(tableBucket.getTableId()); + out.writeBoolean(false); + out.writeInt(tableBucket.getBucket()); + // write result + byte[] writeResultBytes = writeResultSerializer.serialize(testingWriteResult); + out.writeInt(writeResultBytes.length); + out.write(writeResultBytes); + // log end offset + out.writeLong(100L); + // max timestamp + out.writeLong(200L); + // number of write results + out.writeInt(3); + // NOTE: no cancelled flag — this is a version-1 payload + byte[] v1Bytes = out.getCopyOfBuffer(); + + TableBucketWriteResult deserialized = + tableBucketWriteResultSerializer.deserialize(1, v1Bytes); - // verify when writeResult is null - tableBucketWriteResult = - new TableBucketWriteResult<>( - tablePath, tableBucket, partitionName, null, 20, 30L, 30); - serialized = tableBucketWriteResultSerializer.serialize(tableBucketWriteResult); - deserialized = - tableBucketWriteResultSerializer.deserialize( - tableBucketWriteResultSerializer.getVersion(), serialized); assertThat(deserialized.tablePath()).isEqualTo(tablePath); assertThat(deserialized.tableBucket()).isEqualTo(tableBucket); - assertThat(deserialized.partitionName()).isEqualTo(partitionName); - assertThat(deserialized.writeResult()).isNull(); - assertThat(deserialized.numberOfWriteResults()).isEqualTo(30); + assertThat(deserialized.writeResult()).isNotNull(); + assertThat(deserialized.writeResult().getWriteResult()).isEqualTo(42); + assertThat(deserialized.logEndOffset()).isEqualTo(100L); + assertThat(deserialized.maxTimestamp()).isEqualTo(200L); + assertThat(deserialized.numberOfWriteResults()).isEqualTo(3); + // cancelled must default to false when deserializing a version-1 payload + assertThat(deserialized.isCancelled()).isFalse(); } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java index 9e9de2c792..f2583e4562 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java @@ -25,6 +25,7 @@ import org.apache.fluss.flink.tiering.TestingLakeTieringFactory; import org.apache.fluss.flink.tiering.TestingWriteResult; import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent; +import org.apache.fluss.flink.tiering.event.TieringTableDroppedEvent; import org.apache.fluss.flink.tiering.source.split.TieringLogSplit; import org.apache.fluss.flink.utils.FlinkTestBase; import org.apache.fluss.metadata.TableBucket; @@ -175,6 +176,120 @@ void testHandleTieringReachMaxDurationEvent() throws Exception { } } + @Test + void testHandlePendingTableDroppedBeforeFetch() throws Exception { + // Create two tables: one as "current" table and one as "pending" table + TablePath currentTablePath = TablePath.of("fluss", "test_current_table"); + long currentTableId = createTable(currentTablePath, DEFAULT_LOG_TABLE_DESCRIPTOR); + + TablePath pendingTablePath = TablePath.of("fluss", "test_pending_table_dropped"); + long pendingTableId = createTable(pendingTablePath, DEFAULT_LOG_TABLE_DESCRIPTOR); + + Configuration conf = new Configuration(FLUSS_CLUSTER_EXTENSION.getClientConfig()); + conf.set( + ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER, + ConfigOptions.NoKeyAssigner.ROUND_ROBIN); + try (Connection connection = ConnectionFactory.createConnection(conf)) { + // Write some data to current table + writeRows( + connection, + currentTablePath, + Arrays.asList(row(0, "v0"), row(1, "v1"), row(2, "v2")), + true); + + FutureCompletingBlockingQueue< + RecordsWithSplitIds>> + elementsQueue = new FutureCompletingBlockingQueue<>(16); + TestingReaderContext readerContext = new TestingReaderContext(); + try (TieringSourceReader reader = + new TieringSourceReader<>( + elementsQueue, + readerContext, + connection, + new TestingLakeTieringFactory(), + Duration.ofMillis(500))) { + + reader.start(); + + // Add split for current table first - it will become the active table + TieringLogSplit currentSplit = + new TieringLogSplit( + currentTablePath, + new TableBucket(currentTableId, 0), + null, + EARLIEST_OFFSET, + 100L); + reader.addSplits(Collections.singletonList(currentSplit)); + + // Wait for the current table to start tiering + FutureCompletingBlockingQueue< + RecordsWithSplitIds>> + blockingQueue = getElementsQueue(reader); + waitUntil( + () -> !blockingQueue.isEmpty(), + Duration.ofSeconds(30), + "Fail to wait element queue is not empty."); + + // Now add split for pending table - it will go into pendingTieringSplits + TieringLogSplit pendingSplit = + new TieringLogSplit( + pendingTablePath, + new TableBucket(pendingTableId, 0), + null, + EARLIEST_OFFSET, + 100); + reader.addSplits(Collections.singletonList(pendingSplit)); + + // Mark the pending table as dropped BEFORE it becomes the active table + TieringTableDroppedEvent event = new TieringTableDroppedEvent(pendingTableId); + reader.handleSourceEvents(event); + + connection.getAdmin().dropTable(pendingTablePath, true).get(); + + // Force complete the current table so the pending table becomes active + TieringReachMaxDurationEvent maxDurationEvent = + new TieringReachMaxDurationEvent(currentTableId); + reader.handleSourceEvents(maxDurationEvent); + + // First, complete the current table + retry( + Duration.ofMinutes(1), + () -> { + TestingReaderOutput> output = + new TestingReaderOutput<>(); + reader.pollNext(output); + assertThat(output.getEmittedRecords()).hasSize(1); + TableBucketWriteResult result = + output.getEmittedRecords().get(0); + // This should be the current table's result + assertThat(result.tableBucket().getTableId()).isEqualTo(currentTableId); + }); + + // Now the pending (dropped) table should become active and complete + // without throwing RPC exception + retry( + Duration.ofMinutes(1), + () -> { + TestingReaderOutput> output = + new TestingReaderOutput<>(); + reader.pollNext(output); + assertThat(output.getEmittedRecords()).hasSize(1); + TableBucketWriteResult result = + output.getEmittedRecords().get(0); + // This should be the pending table's result + assertThat(result.tableBucket().getTableId()).isEqualTo(pendingTableId); + // write result should be null since dropped table discards data + assertThat(result.writeResult()).isNull(); + // offset and timestamp should be UNKNOWN + assertThat(result.logEndOffset()).isEqualTo(-1L); + assertThat(result.maxTimestamp()).isEqualTo(-1L); + // should be marked as cancelled + assertThat(result.isCancelled()).isTrue(); + }); + } + } + } + /** * Get the elementsQueue from TieringSourceReader using reflection. * diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java index b725f99e7e..17a9c2c612 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java @@ -22,6 +22,7 @@ import org.apache.fluss.flink.tiering.event.FailedTieringEvent; import org.apache.fluss.flink.tiering.event.FinishedTieringEvent; import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent; +import org.apache.fluss.flink.tiering.event.TieringTableDroppedEvent; import org.apache.fluss.flink.tiering.source.TieringTestBase; import org.apache.fluss.flink.tiering.source.split.TieringLogSplit; import org.apache.fluss.flink.tiering.source.split.TieringSnapshotSplit; @@ -838,4 +839,50 @@ void testTableReachMaxTieringDuration() throws Throwable { && !split.shouldSkipCurrentRound()); } } + + @Test + void testHandleTableDropped() throws Throwable { + TablePath tablePath = TablePath.of(DEFAULT_DB, "tiering-dropped-table-test"); + long tableId = createTable(tablePath, DEFAULT_LOG_TABLE_DESCRIPTOR); + int numSubtasks = 2; + + appendRow(tablePath, DEFAULT_LOG_TABLE_DESCRIPTOR, 0, 10); + + try (FlussMockSplitEnumeratorContext context = + new FlussMockSplitEnumeratorContext<>(numSubtasks); + TieringSourceEnumerator enumerator = + createTieringSourceEnumerator(flussConf, context)) { + enumerator.start(); + + // Register all readers + for (int subtaskId = 0; subtaskId < numSubtasks; subtaskId++) { + context.registerSourceReader(subtaskId, subtaskId, "localhost-" + subtaskId); + } + + for (int subTask = 0; subTask < numSubtasks; subTask++) { + enumerator.handleSplitRequest(subTask, "localhost-" + subTask); + } + + // Wait for initial assignment - this registers the table in tieringTableEpochs + // Use numSubtasks (not DEFAULT_BUCKET_NUM) since only numSubtasks readers + // request splits, so at most numSubtasks assignments can be made + waitUntilTieringTableSplitAssignmentReady(context, numSubtasks, 200L); + + // Drop the table while tiering is in progress + conn.getAdmin().dropTable(tablePath, true).get(); + + // Directly call handleTableDropped to simulate detection of table drop + // This is similar to how testTableReachMaxTieringDuration directly triggers + // handleTableTieringReachMaxDuration via timer + enumerator.handleTableDropped(tableId); + + // Verify that TieringTableDroppedEvent was sent to all readers + // The containsExactly assertion naturally triggers equals/hashCode coverage + Map> eventsToReaders = context.getSentSourceEvent(); + assertThat(eventsToReaders).hasSize(numSubtasks); + for (Map.Entry> entry : eventsToReaders.entrySet()) { + assertThat(entry.getValue()).contains(new TieringTableDroppedEvent(tableId)); + } + } + } } diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java index 8472b825b0..d9a3614d38 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java @@ -29,6 +29,8 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessage; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.Collections; import java.util.List; @@ -76,6 +78,7 @@ public void write(LogRecord record) throws IOException { } } + @Nullable @Override public PaimonWriteResult complete() throws IOException { CommitMessage commitMessage; @@ -84,6 +87,10 @@ public PaimonWriteResult complete() throws IOException { } catch (Exception e) { throw new IOException("Failed to complete Paimon write.", e); } + if (commitMessage == null) { + // No data was written, return null to indicate empty write + return null; + } return new PaimonWriteResult(commitMessage); } diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java index 413b141898..8f6871bf26 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java @@ -61,8 +61,18 @@ public RecordWriter( public abstract void write(LogRecord record) throws Exception; + /** + * Completes the write process and returns the commit message. + * + * @return the commit message, or null if no data was written (empty write scenario) + */ + @Nullable CommitMessage complete() throws Exception { List commitMessages = tableWrite.prepareCommit(); + if (commitMessages.isEmpty()) { + // No data was written, return null to indicate empty write + return null; + } checkState( commitMessages.size() == 1, "The size of CommitMessage must be 1, but got %s.",