diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/LakeTieringJobBuilder.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/LakeTieringJobBuilder.java index 74cf91e0e0..ab02d5b0e0 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/LakeTieringJobBuilder.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/LakeTieringJobBuilder.java @@ -89,6 +89,7 @@ public JobClient build() throws Exception { tieringSourceBuilder.withPollTieringTableIntervalMs( flussConfig.get(POLL_TIERING_TABLE_INTERVAL).toMillis()); } + TieringSource tieringSource = tieringSourceBuilder.build(); DataStreamSource source = env.fromSource( diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringReachMaxDurationEvent.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringReachMaxDurationEvent.java new file mode 100644 index 0000000000..dd3f32e262 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringReachMaxDurationEvent.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering.event; + +import org.apache.flink.api.connector.source.SourceEvent; + +import java.util.Objects; + +/** + * SourceEvent used to notify TieringSourceReader that a table has reached the maximum tiering + * duration and should be force completed. + */ +public class TieringReachMaxDurationEvent implements SourceEvent { + + private static final long serialVersionUID = 1L; + + private final long tableId; + + public TieringReachMaxDurationEvent(long tableId) { + this.tableId = tableId; + } + + public long getTableId() { + return tableId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof TieringReachMaxDurationEvent)) { + return false; + } + TieringReachMaxDurationEvent that = (TieringReachMaxDurationEvent) o; + return tableId == that.tableId; + } + + @Override + public int hashCode() { + return Objects.hashCode(tableId); + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java index b2d8c28dc9..04448c1939 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java @@ -36,6 +36,8 @@ import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.streaming.api.graph.StreamGraphHasherV2; @@ -78,7 +80,7 @@ public Boundedness getBoundedness() { @Override public SplitEnumerator createEnumerator( - SplitEnumeratorContext splitEnumeratorContext) throws Exception { + SplitEnumeratorContext splitEnumeratorContext) { return new TieringSourceEnumerator( flussConf, splitEnumeratorContext, pollTieringTableIntervalMs); } @@ -86,8 +88,7 @@ public SplitEnumerator createEnumera @Override public SplitEnumerator restoreEnumerator( SplitEnumeratorContext splitEnumeratorContext, - TieringSourceEnumeratorState tieringSourceEnumeratorState) - throws Exception { + TieringSourceEnumeratorState tieringSourceEnumeratorState) { // stateless operator return new TieringSourceEnumerator( flussConf, splitEnumeratorContext, pollTieringTableIntervalMs); @@ -107,8 +108,11 @@ public SimpleVersionedSerializer getSplitSerializer() { @Override public SourceReader, TieringSplit> createReader( SourceReaderContext sourceReaderContext) { + FutureCompletingBlockingQueue>> + elementsQueue = new FutureCompletingBlockingQueue<>(); Connection connection = ConnectionFactory.createConnection(flussConf); - return new TieringSourceReader<>(sourceReaderContext, connection, lakeTieringFactory); + return new TieringSourceReader<>( + elementsQueue, sourceReaderContext, connection, lakeTieringFactory); } /** This follows the operator uid hash generation logic of flink {@link StreamGraphHasherV2}. */ diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java new file mode 100644 index 0000000000..4f67e12195 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering.source; + +import org.apache.fluss.flink.adapter.SingleThreadFetcherManagerAdapter; +import org.apache.fluss.flink.tiering.source.split.TieringSplit; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher; +import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherTask; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; + +import java.util.Collection; +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** + * The SplitFetcherManager for tiering source. This class is needed to help notify a table reaches + * the max duration of tiering to {@link TieringSplitReader}. + */ +public class TieringSourceFetcherManager + extends SingleThreadFetcherManagerAdapter< + TableBucketWriteResult, TieringSplit> { + + public TieringSourceFetcherManager( + FutureCompletingBlockingQueue>> + elementsQueue, + Supplier, TieringSplit>> + splitReaderSupplier, + Configuration configuration, + Consumer> splitFinishedHook) { + super(elementsQueue, splitReaderSupplier, configuration, splitFinishedHook); + } + + public void markTableReachTieringMaxDuration(long tableId) { + if (!fetchers.isEmpty()) { + // The fetcher thread is still running. This should be the majority of the cases. + fetchers.values() + .forEach( + splitFetcher -> + enqueueMarkTableReachTieringMaxDurationTask( + splitFetcher, tableId)); + } else { + SplitFetcher, TieringSplit> splitFetcher = + createSplitFetcher(); + enqueueMarkTableReachTieringMaxDurationTask(splitFetcher, tableId); + startFetcher(splitFetcher); + } + } + + private void enqueueMarkTableReachTieringMaxDurationTask( + SplitFetcher, TieringSplit> splitFetcher, + long reachTieringDeadlineTable) { + splitFetcher.enqueueTask( + new SplitFetcherTask() { + @Override + public boolean run() { + ((TieringSplitReader) splitFetcher.getSplitReader()) + .handleTableReachTieringMaxDuration(reachTieringDeadlineTable); + return true; + } + + @Override + public void wakeUp() { + // do nothing + } + }); + } +} diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java index a6b14b320d..63e73bab97 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java @@ -18,23 +18,31 @@ package org.apache.fluss.flink.tiering.source; import org.apache.fluss.annotation.Internal; +import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.client.Connection; +import org.apache.fluss.flink.adapter.SingleThreadMultiplexSourceReaderBaseAdapter; +import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent; import org.apache.fluss.flink.tiering.source.split.TieringSplit; import org.apache.fluss.flink.tiering.source.state.TieringSplitState; import org.apache.fluss.lake.writer.LakeTieringFactory; +import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; -import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Map; +import static org.apache.fluss.flink.tiering.source.TieringSplitReader.DEFAULT_POLL_TIMEOUT; + /** A {@link SourceReader} that read records from Fluss and write to lake. */ @Internal public final class TieringSourceReader - extends SingleThreadMultiplexSourceReaderBase< + extends SingleThreadMultiplexSourceReaderBaseAdapter< TableBucketWriteResult, TableBucketWriteResult, TieringSplit, @@ -43,11 +51,29 @@ public final class TieringSourceReader private final Connection connection; public TieringSourceReader( + FutureCompletingBlockingQueue>> + elementsQueue, SourceReaderContext context, Connection connection, LakeTieringFactory lakeTieringFactory) { + this(elementsQueue, context, connection, lakeTieringFactory, DEFAULT_POLL_TIMEOUT); + } + + @VisibleForTesting + TieringSourceReader( + FutureCompletingBlockingQueue>> + elementsQueue, + SourceReaderContext context, + Connection connection, + LakeTieringFactory lakeTieringFactory, + Duration pollTimeout) { super( - () -> new TieringSplitReader<>(connection, lakeTieringFactory), + elementsQueue, + new TieringSourceFetcherManager<>( + elementsQueue, + () -> new TieringSplitReader<>(connection, lakeTieringFactory, pollTimeout), + context.getConfiguration(), + (ignore) -> {}), new TableBucketWriteResultEmitter<>(), context.getConfiguration(), context); @@ -89,6 +115,17 @@ protected TieringSplit toSplitType(String splitId, TieringSplitState splitState) return splitState.toSourceSplit(); } + @Override + public void handleSourceEvents(SourceEvent sourceEvent) { + if (sourceEvent instanceof TieringReachMaxDurationEvent) { + TieringReachMaxDurationEvent reachMaxDurationEvent = + (TieringReachMaxDurationEvent) sourceEvent; + long tableId = reachMaxDurationEvent.getTableId(); + ((TieringSourceFetcherManager) splitFetcherManager) + .markTableReachTieringMaxDuration(tableId); + } + } + @Override public void close() throws Exception { super.close(); diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java index b9fe79e3d3..87daf64da1 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java @@ -17,6 +17,7 @@ package org.apache.fluss.flink.tiering.source; +import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.client.Connection; import org.apache.fluss.client.table.Table; import org.apache.fluss.client.table.scanner.ScanRecord; @@ -57,6 +58,7 @@ import static org.apache.fluss.utils.Preconditions.checkArgument; import static org.apache.fluss.utils.Preconditions.checkNotNull; +import static org.apache.fluss.utils.Preconditions.checkState; /** The {@link SplitReader} implementation which will read Fluss and write to lake. */ public class TieringSplitReader @@ -64,18 +66,25 @@ public class TieringSplitReader private static final Logger LOG = LoggerFactory.getLogger(TieringSplitReader.class); - private static final Duration POLL_TIMEOUT = Duration.ofMillis(10000L); + public static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofMillis(10_000L); // unknown bucket timestamp for empty split or snapshot split private static final long UNKNOWN_BUCKET_TIMESTAMP = -1; + // unknown bucket offset for empty split or snapshot split + private static final long UNKNOWN_BUCKET_OFFSET = -1; + private final LakeTieringFactory lakeTieringFactory; + private final Duration pollTimeout; + // the id for the pending tables to be tiered private final Queue pendingTieringTables; // the table_id to the pending splits private final Map> pendingTieringSplits; + private final Set reachTieringMaxDurationTables; + private final Map> lakeWriters; private final Connection connection; @@ -92,38 +101,55 @@ public class TieringSplitReader // map from table bucket to split id private final Map currentTableSplitsByBucket; private final Map currentTableStoppingOffsets; - private final Set currentTableEmptyLogSplits; + + private final Map currentTableTieredOffsetAndTimestamp; + + private final Set currentEmptySplits; public TieringSplitReader( Connection connection, LakeTieringFactory lakeTieringFactory) { + this(connection, lakeTieringFactory, DEFAULT_POLL_TIMEOUT); + } + + @VisibleForTesting + protected TieringSplitReader( + Connection connection, + LakeTieringFactory lakeTieringFactory, + Duration pollTimeout) { this.lakeTieringFactory = lakeTieringFactory; // owned by TieringSourceReader this.connection = connection; this.pendingTieringTables = new ArrayDeque<>(); this.pendingTieringSplits = new HashMap<>(); this.currentTableStoppingOffsets = new HashMap<>(); - this.currentTableEmptyLogSplits = new HashSet<>(); + this.currentTableTieredOffsetAndTimestamp = new HashMap<>(); + this.currentEmptySplits = new HashSet<>(); this.currentTableSplitsByBucket = new HashMap<>(); this.lakeWriters = new HashMap<>(); this.currentPendingSnapshotSplits = new ArrayDeque<>(); + this.reachTieringMaxDurationTables = new HashSet<>(); + this.pollTimeout = pollTimeout; } @Override public RecordsWithSplitIds> fetch() throws IOException { // check empty splits - if (!currentTableEmptyLogSplits.isEmpty()) { - LOG.info("Empty split(s) {} finished.", currentTableEmptyLogSplits); - TableBucketWriteResultWithSplitIds records = forEmptySplits(currentTableEmptyLogSplits); - currentTableEmptyLogSplits.forEach( + if (!currentEmptySplits.isEmpty()) { + LOG.info("Empty split(s) {} finished.", currentEmptySplits); + TableBucketWriteResultWithSplitIds records = forEmptySplits(currentEmptySplits); + currentEmptySplits.forEach( split -> currentTableSplitsByBucket.remove(split.getTableBucket())); mayFinishCurrentTable(); - currentTableEmptyLogSplits.clear(); + currentEmptySplits.clear(); return records; } checkSplitOrStartNext(); // may read snapshot firstly if (currentSnapshotSplitReader != null) { + // for snapshot split, we don't force to complete it + // since we rely on the log offset for the snapshot to + // do next tiering, if force to complete, we can't get the log offset CloseableIterator recordIterator = currentSnapshotSplitReader.readBatch(); if (recordIterator == null) { LOG.info("Split {} is finished", currentSnapshotSplit.splitId()); @@ -134,7 +160,11 @@ public RecordsWithSplitIds> fetch() throws I } } else { if (currentLogScanner != null) { - ScanRecords scanRecords = currentLogScanner.poll(POLL_TIMEOUT); + // force to complete records + if (reachTieringMaxDurationTables.contains(currentTableId)) { + return forceCompleteTieringLogRecords(); + } + ScanRecords scanRecords = currentLogScanner.poll(pollTimeout); return forLogRecords(scanRecords); } else { return emptyTableBucketWriteResultWithSplitIds(); @@ -152,6 +182,15 @@ public void handleSplitsChanges(SplitsChange splitsChange) { } for (TieringSplit split : splitsChange.splits()) { LOG.info("add split {}", split.splitId()); + if (split.shouldSkipCurrentRound()) { + // if the split is forced to ignore, + // mark it as empty + LOG.info( + "ignore split {} since the split is set to skip the current round of tiering.", + split.splitId()); + currentEmptySplits.add(split); + continue; + } long tableId = split.getTableBucket().getTableId(); // the split belongs to the current table if (currentTableId != null && currentTableId == tableId) { @@ -248,6 +287,58 @@ private void mayCreateLogScanner() { } } + private RecordsWithSplitIds> + forceCompleteTieringLogRecords() throws IOException { + Map> writeResults = new HashMap<>(); + Map finishedSplitIds = new HashMap<>(); + + // force finish all splits + Iterator> currentTieringSplitsIterator = + currentTableSplitsByBucket.entrySet().iterator(); + while (currentTieringSplitsIterator.hasNext()) { + Map.Entry entry = currentTieringSplitsIterator.next(); + TableBucket bucket = entry.getKey(); + TieringSplit split = entry.getValue(); + if (split != null && split.isTieringLogSplit()) { + // get the current offset, timestamp that tiered so far + LogOffsetAndTimestamp logOffsetAndTimestamp = + currentTableTieredOffsetAndTimestamp.get(bucket); + long logEndOffset = + logOffsetAndTimestamp == null + ? UNKNOWN_BUCKET_OFFSET + // logEndOffset is equal to offset tiered + 1 + : logOffsetAndTimestamp.logOffset + 1; + long timestamp = + logOffsetAndTimestamp == null + ? UNKNOWN_BUCKET_TIMESTAMP + : logOffsetAndTimestamp.timestamp; + TableBucketWriteResult bucketWriteResult = + completeLakeWriter( + bucket, split.getPartitionName(), logEndOffset, timestamp); + + if (logEndOffset == UNKNOWN_BUCKET_OFFSET) { + // when the log end offset is unknown, the write result must be + // null, otherwise, we should throw exception directly to avoid data + // inconsistent + checkState( + bucketWriteResult.writeResult() == null, + "bucketWriteResult must be null when log end offset is unknown when tiering " + + split); + } + + writeResults.put(bucket, bucketWriteResult); + finishedSplitIds.put(bucket, split.splitId()); + LOG.info( + "Split {} is forced to be finished due to tiering reach max duration.", + split.splitId()); + currentTieringSplitsIterator.remove(); + } + } + reachTieringMaxDurationTables.remove(this.currentTableId); + mayFinishCurrentTable(); + return new TableBucketWriteResultWithSplitIds(writeResults, finishedSplitIds); + } + private RecordsWithSplitIds> forLogRecords( ScanRecords scanRecords) throws IOException { Map> writeResults = new HashMap<>(); @@ -272,6 +363,9 @@ private RecordsWithSplitIds> forLogRecords( } } ScanRecord lastRecord = bucketScanRecords.get(bucketScanRecords.size() - 1); + currentTableTieredOffsetAndTimestamp.put( + bucket, + new LogOffsetAndTimestamp(lastRecord.logOffset(), lastRecord.timestamp())); // has arrived into the end of the split, if (lastRecord.logOffset() >= stoppingOffset - 1) { currentTableStoppingOffsets.remove(bucket); @@ -293,7 +387,11 @@ private RecordsWithSplitIds> forLogRecords( lastRecord.timestamp())); // put split of the bucket finishedSplitIds.put(bucket, currentSplitId); - LOG.info("Split {} has been finished.", currentSplitId); + LOG.info( + "Finish tier bucket {} for table {}, split: {}.", + bucket, + currentTablePath, + currentSplitId); } } @@ -327,8 +425,11 @@ private TableBucketWriteResult completeLakeWriter( long maxTimestamp) throws IOException { LakeWriter lakeWriter = lakeWriters.remove(bucket); - WriteResult writeResult = lakeWriter.complete(); - lakeWriter.close(); + WriteResult writeResult = null; + if (lakeWriter != null) { + writeResult = lakeWriter.complete(); + lakeWriter.close(); + } return toTableBucketWriteResult( currentTablePath, bucket, @@ -339,22 +440,22 @@ private TableBucketWriteResult completeLakeWriter( checkNotNull(currentTableNumberOfSplits)); } - private TableBucketWriteResultWithSplitIds forEmptySplits(Set emptySplits) { + private TableBucketWriteResultWithSplitIds forEmptySplits(Set emptySplits) { Map> writeResults = new HashMap<>(); Map finishedSplitIds = new HashMap<>(); - for (TieringLogSplit logSplit : emptySplits) { - TableBucket tableBucket = logSplit.getTableBucket(); - finishedSplitIds.put(tableBucket, logSplit.splitId()); + for (TieringSplit tieringSplit : emptySplits) { + TableBucket tableBucket = tieringSplit.getTableBucket(); + finishedSplitIds.put(tableBucket, tieringSplit.splitId()); writeResults.put( tableBucket, toTableBucketWriteResult( - logSplit.getTablePath(), + tieringSplit.getTablePath(), tableBucket, - logSplit.getPartitionName(), + tieringSplit.getPartitionName(), null, - logSplit.getStoppingOffset(), + UNKNOWN_BUCKET_OFFSET, UNKNOWN_BUCKET_TIMESTAMP, - logSplit.getNumberOfSplits())); + tieringSplit.getNumberOfSplits())); } return new TableBucketWriteResultWithSplitIds(writeResults, finishedSplitIds); } @@ -362,7 +463,6 @@ private TableBucketWriteResultWithSplitIds forEmptySplits(Set e private void mayFinishCurrentTable() throws IOException { // no any pending splits for the table, just finish the table if (currentTableSplitsByBucket.isEmpty()) { - LOG.info("Finish tier table {} of table id {}.", currentTablePath, currentTableId); finishCurrentTable(); } } @@ -377,6 +477,11 @@ private TableBucketWriteResultWithSplitIds finishCurrentSnapshotSplit() throws I currentSnapshotSplit.getPartitionName(), logEndOffset, UNKNOWN_BUCKET_TIMESTAMP); + LOG.info( + "Finish tier bucket {} for table {}, split: {}.", + tableBucket, + currentTablePath, + splitId); closeCurrentSnapshotSplit(); mayFinishCurrentTable(); return new TableBucketWriteResultWithSplitIds( @@ -430,17 +535,29 @@ private void finishCurrentTable() throws IOException { } catch (Exception e) { throw new IOException("Fail to finish current table.", e); } - + reachTieringMaxDurationTables.remove(currentTableId); // before switch to a new table, mark all as empty or null currentTableId = null; currentTablePath = null; currentTableNumberOfSplits = null; currentPendingSnapshotSplits.clear(); currentTableStoppingOffsets.clear(); - currentTableEmptyLogSplits.clear(); + currentTableTieredOffsetAndTimestamp.clear(); currentTableSplitsByBucket.clear(); } + /** + * Handle a table reach max tiering duration. This will mark the current table as reaching max + * duration, and it will be force completed in the next fetch cycle. + */ + public void handleTableReachTieringMaxDuration(long tableId) { + if ((currentTableId != null && currentTableId.equals(tableId)) + || pendingTieringSplits.containsKey(tableId)) { + LOG.info("Table {} reach tiering max duration, will force to complete.", tableId); + reachTieringMaxDurationTables.add(tableId); + } + } + @Override public void wakeUp() { if (currentLogScanner != null) { @@ -466,7 +583,7 @@ private void subscribeLog(TieringLogSplit logSplit) { long stoppingOffset = logSplit.getStoppingOffset(); long startingOffset = logSplit.getStartingOffset(); if (startingOffset >= stoppingOffset || stoppingOffset <= 0) { - currentTableEmptyLogSplits.add(logSplit); + currentEmptySplits.add(logSplit); return; } else { currentTableStoppingOffsets.put(tableBucket, stoppingOffset); @@ -559,4 +676,15 @@ public Set finishedSplits() { return new HashSet<>(bucketSplits.values()); } } + + private static final class LogOffsetAndTimestamp { + + private final long logOffset; + private final long timestamp; + + public LogOffsetAndTimestamp(long logOffset, long timestamp) { + this.logOffset = logOffset; + this.timestamp = timestamp; + } + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java index c1c26390a3..7fd36c656a 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java @@ -26,9 +26,11 @@ import org.apache.fluss.flink.metrics.FlinkMetricRegistry; import org.apache.fluss.flink.tiering.event.FailedTieringEvent; import org.apache.fluss.flink.tiering.event.FinishedTieringEvent; +import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent; import org.apache.fluss.flink.tiering.source.split.TieringSplit; import org.apache.fluss.flink.tiering.source.split.TieringSplitGenerator; import org.apache.fluss.flink.tiering.source.state.TieringSourceEnumeratorState; +import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.rpc.GatewayClientProxy; import org.apache.fluss.rpc.RpcClient; @@ -54,12 +56,16 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -90,13 +96,16 @@ public class TieringSourceEnumerator private final Configuration flussConf; private final SplitEnumeratorContext context; + private final ScheduledExecutorService timerService; private final SplitEnumeratorMetricGroup enumeratorMetricGroup; private final long pollTieringTableIntervalMs; private final List pendingSplits; private final Set readersAwaitingSplit; + private final Map tieringTableEpochs; private final Map failedTableEpochs; - private final Map finishedTableEpochs; + private final Map finishedTables; + private final Set tieringReachMaxDurationsTables; // lazily instantiated private RpcClient rpcClient; @@ -116,13 +125,17 @@ public TieringSourceEnumerator( long pollTieringTableIntervalMs) { this.flussConf = flussConf; this.context = context; + this.timerService = + Executors.newSingleThreadScheduledExecutor( + r -> new Thread(r, "Tiering-Timer-Thread")); this.enumeratorMetricGroup = context.metricGroup(); this.pollTieringTableIntervalMs = pollTieringTableIntervalMs; - this.pendingSplits = new ArrayList<>(); - this.readersAwaitingSplit = new TreeSet<>(); + this.pendingSplits = Collections.synchronizedList(new ArrayList<>()); + this.readersAwaitingSplit = Collections.synchronizedSet(new TreeSet<>()); this.tieringTableEpochs = MapUtils.newConcurrentHashMap(); - this.finishedTableEpochs = MapUtils.newConcurrentHashMap(); + this.finishedTables = MapUtils.newConcurrentHashMap(); this.failedTableEpochs = MapUtils.newConcurrentHashMap(); + this.tieringReachMaxDurationsTables = Collections.synchronizedSet(new TreeSet<>()); } @Override @@ -168,8 +181,24 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname } LOG.info("TieringSourceReader {} requests split.", subtaskId); readersAwaitingSplit.add(subtaskId); - this.context.callAsync( - this::requestTieringTableSplitsViaHeartBeat, this::generateAndAssignSplits); + + // If pending splits exist, assign them directly to the requesting reader + if (!pendingSplits.isEmpty()) { + assignSplits(); + } else { + // Note: Ideally, only one table should be tiering at a time. + // Here we block to request a tiering table synchronously to avoid multiple threads + // requesting tiering tables concurrently, which would cause the enumerator to contain + // multiple tiering tables simultaneously. This is not optimal for tiering performance. + Tuple3 tieringTable = null; + Throwable throwable = null; + try { + tieringTable = this.requestTieringTableSplitsViaHeartBeat(); + } catch (Throwable t) { + throwable = t; + } + this.generateAndAssignSplits(tieringTable, throwable); + } } @Override @@ -241,7 +270,9 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { "The finished table {} is not in tiering table, won't report it to Fluss to mark as finished.", finishedTableId); } else { - finishedTableEpochs.put(finishedTableId, tieringEpoch); + boolean isForceFinished = tieringReachMaxDurationsTables.remove(finishedTableId); + finishedTables.put( + finishedTableId, TieringFinishInfo.from(tieringEpoch, isForceFinished)); } } @@ -263,7 +294,7 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { } } - if (!finishedTableEpochs.isEmpty() || !failedTableEpochs.isEmpty()) { + if (!finishedTables.isEmpty() || !failedTableEpochs.isEmpty()) { // call one round of heartbeat to notify table has been finished or failed this.context.callAsync( this::requestTieringTableSplitsViaHeartBeat, this::generateAndAssignSplits); @@ -277,6 +308,7 @@ private void handleSourceReaderFailOver() { // we need to make all as failed failedTableEpochs.putAll(new HashMap<>(tieringTableEpochs)); tieringTableEpochs.clear(); + tieringReachMaxDurationsTables.clear(); // also clean all pending splits since we mark all as failed pendingSplits.clear(); if (!failedTableEpochs.isEmpty()) { @@ -286,6 +318,31 @@ private void handleSourceReaderFailOver() { } } + @VisibleForTesting + protected void handleTableTieringReachMaxDuration( + TablePath tablePath, long tableId, long tieringEpoch) { + Long currentEpoch = tieringTableEpochs.get(tableId); + if (currentEpoch != null && currentEpoch.equals(tieringEpoch)) { + LOG.info("Table {}-{} reached max duration. Force completing.", tablePath, tableId); + tieringReachMaxDurationsTables.add(tableId); + + for (TieringSplit tieringSplit : pendingSplits) { + if (tieringSplit.getTableBucket().getTableId() == tableId) { + // mark this tiering split to skip the current round since the tiering for + // this table has timed out, so the tiering source reader can skip them directly + tieringSplit.skipCurrentRound(); + } + } + + // broadcast the tiering reach max duration event to all readers, + // we broadcast all for simplicity + Set readers = new HashSet<>(context.registeredReaders().keySet()); + for (int reader : readers) { + context.sendEventToSourceReader(reader, new TieringReachMaxDurationEvent(tableId)); + } + } + } + private void generateAndAssignSplits( @Nullable Tuple3 tieringTable, Throwable throwable) { if (throwable != null) { @@ -298,24 +355,21 @@ private void generateAndAssignSplits( } private void assignSplits() { - // we don't assign splits during failovering + // we don't assign splits during failover if (isFailOvering) { return; } - /* This method may be called from both addSplitsBack and handleSplitRequest, make it thread safe. */ - synchronized (readersAwaitingSplit) { - if (!readersAwaitingSplit.isEmpty()) { - final Integer[] readers = readersAwaitingSplit.toArray(new Integer[0]); - for (Integer nextAwaitingReader : readers) { - if (!context.registeredReaders().containsKey(nextAwaitingReader)) { - readersAwaitingSplit.remove(nextAwaitingReader); - continue; - } - if (!pendingSplits.isEmpty()) { - TieringSplit tieringSplit = pendingSplits.remove(0); - context.assignSplit(tieringSplit, nextAwaitingReader); - readersAwaitingSplit.remove(nextAwaitingReader); - } + if (!readersAwaitingSplit.isEmpty()) { + final Integer[] readers = readersAwaitingSplit.toArray(new Integer[0]); + for (Integer nextAwaitingReader : readers) { + if (!context.registeredReaders().containsKey(nextAwaitingReader)) { + readersAwaitingSplit.remove(nextAwaitingReader); + continue; + } + if (!pendingSplits.isEmpty()) { + TieringSplit tieringSplit = pendingSplits.remove(0); + context.assignSplit(tieringSplit, nextAwaitingReader); + readersAwaitingSplit.remove(nextAwaitingReader); } } } @@ -325,13 +379,13 @@ private void assignSplits() { if (closed) { return null; } - Map currentFinishedTableEpochs = new HashMap<>(this.finishedTableEpochs); + Map currentFinishedTables = new HashMap<>(this.finishedTables); Map currentFailedTableEpochs = new HashMap<>(this.failedTableEpochs); LakeTieringHeartbeatRequest tieringHeartbeatRequest = tieringTableHeartBeat( basicHeartBeat(), this.tieringTableEpochs, - currentFinishedTableEpochs, + currentFinishedTables, currentFailedTableEpochs, this.flussCoordinatorEpoch); @@ -360,9 +414,9 @@ private void assignSplits() { waitHeartbeatResponse(coordinatorGateway.lakeTieringHeartbeat(tieringHeartbeatRequest)); } - // if come to here, we can remove currentFinishedTableEpochs/failedTableEpochs to avoid send + // if come to here, we can remove currentFinishedTables/failedTableEpochs to avoid send // in next round - currentFinishedTableEpochs.forEach(finishedTableEpochs::remove); + currentFinishedTables.forEach(finishedTables::remove); currentFailedTableEpochs.forEach(failedTableEpochs::remove); return lakeTieringInfo; } @@ -375,9 +429,13 @@ private void generateTieringSplits(Tuple3 tieringTable) long start = System.currentTimeMillis(); LOG.info("Generate Tiering splits for table {}.", tieringTable.f2); try { + TablePath tablePath = tieringTable.f2; + final TableInfo tableInfo = flussAdmin.getTableInfo(tablePath).get(); List tieringSplits = - populateNumberOfTieringSplits( - splitGenerator.generateTableSplits(tieringTable.f2)); + populateNumberOfTieringSplits(splitGenerator.generateTableSplits(tableInfo)); + // shuffle tiering split to avoid splits tiering skew + // after introduce tiering max duration + Collections.shuffle(tieringSplits); LOG.info( "Generate Tiering {} splits for table {} with cost {}ms.", tieringSplits.size(), @@ -387,10 +445,23 @@ private void generateTieringSplits(Tuple3 tieringTable) LOG.info( "Generate Tiering splits for table {} is empty, no need to tier data.", tieringTable.f2.getTableName()); - finishedTableEpochs.put(tieringTable.f0, tieringTable.f1); + finishedTables.put(tieringTable.f0, TieringFinishInfo.from(tieringTable.f1)); } else { tieringTableEpochs.put(tieringTable.f0, tieringTable.f1); pendingSplits.addAll(tieringSplits); + + timerService.schedule( + () -> + context.runInCoordinatorThread( + () -> + handleTableTieringReachMaxDuration( + tablePath, + tieringTable.f0, + tieringTable.f1)), + + // for simplicity, we use the freshness as + tableInfo.getTableConfig().getDataLakeFreshness().toMillis(), + TimeUnit.MILLISECONDS); } } catch (Exception e) { LOG.warn("Fail to generate Tiering splits for table {}.", tieringTable.f2, e); @@ -414,6 +485,7 @@ public TieringSourceEnumeratorState snapshotState(long checkpointId) throws Exce @Override public void close() throws IOException { closed = true; + timerService.shutdownNow(); if (rpcClient != null) { failedTableEpochs.putAll(tieringTableEpochs); tieringTableEpochs.clear(); @@ -484,16 +556,28 @@ static LakeTieringHeartbeatRequest heartBeatWithRequestNewTieringTable( static LakeTieringHeartbeatRequest tieringTableHeartBeat( LakeTieringHeartbeatRequest heartbeatRequest, Map tieringTableEpochs, - Map finishedTableEpochs, + Map finishedTables, Map failedTableEpochs, int coordinatorEpoch) { if (!tieringTableEpochs.isEmpty()) { heartbeatRequest.addAllTieringTables( toPbHeartbeatReqForTable(tieringTableEpochs, coordinatorEpoch)); } - if (!finishedTableEpochs.isEmpty()) { + if (!finishedTables.isEmpty()) { + Map finishTieringEpochs = new HashMap<>(); + Set forceFinishedTables = new HashSet<>(); + finishedTables.forEach( + (tableId, tieringFinishInfo) -> { + finishTieringEpochs.put(tableId, tieringFinishInfo.tieringEpoch); + if (tieringFinishInfo.isForceFinished) { + forceFinishedTables.add(tableId); + } + }); heartbeatRequest.addAllFinishedTables( - toPbHeartbeatReqForTable(finishedTableEpochs, coordinatorEpoch)); + toPbHeartbeatReqForTable(finishTieringEpochs, coordinatorEpoch)); + for (long forceFinishedTableId : forceFinishedTables) { + heartbeatRequest.addForceFinishedTable(forceFinishedTableId); + } } // add failed tiering table to heart beat request return failedTableHeartBeat(heartbeatRequest, failedTableEpochs, coordinatorEpoch); @@ -537,4 +621,29 @@ static LakeTieringHeartbeatResponse waitHeartbeatResponse( } } } + + private static class TieringFinishInfo { + /** The epoch of the tiering operation for this table. */ + long tieringEpoch; + + /** + * Whether this table was force finished due to reaching the maximum tiering duration. When + * a table's tiering operation exceeds the max duration (data lake freshness), it will be + * force finished to prevent it from blocking other tables' tiering operations. + */ + boolean isForceFinished; + + public static TieringFinishInfo from(long tieringEpoch) { + return new TieringFinishInfo(tieringEpoch, false); + } + + public static TieringFinishInfo from(long tieringEpoch, boolean isForceFinished) { + return new TieringFinishInfo(tieringEpoch, isForceFinished); + } + + private TieringFinishInfo(long tieringEpoch, boolean isForceFinished) { + this.tieringEpoch = tieringEpoch; + this.isForceFinished = isForceFinished; + } + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringLogSplit.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringLogSplit.java index 3751e531f8..afd8d75f8f 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringLogSplit.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringLogSplit.java @@ -54,7 +54,25 @@ public TieringLogSplit( long startingOffset, long stoppingOffset, int numberOfSplits) { - super(tablePath, tableBucket, partitionName, numberOfSplits); + this( + tablePath, + tableBucket, + partitionName, + startingOffset, + stoppingOffset, + numberOfSplits, + false); + } + + public TieringLogSplit( + TablePath tablePath, + TableBucket tableBucket, + @Nullable String partitionName, + long startingOffset, + long stoppingOffset, + int numberOfSplits, + boolean skipCurrentRound) { + super(tablePath, tableBucket, partitionName, numberOfSplits, skipCurrentRound); this.startingOffset = startingOffset; this.stoppingOffset = stoppingOffset; } @@ -82,12 +100,14 @@ public String toString() { + ", partitionName='" + partitionName + '\'' + + ", numberOfSplits=" + + numberOfSplits + + ", skipCurrentRound=" + + skipCurrentRound + ", startingOffset=" + startingOffset + ", stoppingOffset=" + stoppingOffset - + ", numberOfSplits=" - + numberOfSplits + '}'; } @@ -99,7 +119,8 @@ public TieringLogSplit copy(int numberOfSplits) { partitionName, startingOffset, stoppingOffset, - numberOfSplits); + numberOfSplits, + skipCurrentRound); } @Override diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSnapshotSplit.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSnapshotSplit.java index a0a095d578..e6b37a925f 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSnapshotSplit.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSnapshotSplit.java @@ -43,10 +43,16 @@ public TieringSnapshotSplit( TableBucket tableBucket, @Nullable String partitionName, long snapshotId, - long logOffsetOfSnapshot) { - super(tablePath, tableBucket, partitionName, UNKNOWN_NUMBER_OF_SPLITS); - this.snapshotId = snapshotId; - this.logOffsetOfSnapshot = logOffsetOfSnapshot; + long logOffsetOfSnapshot, + int numberOfSplits) { + this( + tablePath, + tableBucket, + partitionName, + snapshotId, + logOffsetOfSnapshot, + numberOfSplits, + false); } public TieringSnapshotSplit( @@ -55,8 +61,9 @@ public TieringSnapshotSplit( @Nullable String partitionName, long snapshotId, long logOffsetOfSnapshot, - int numberOfSplits) { - super(tablePath, tableBucket, partitionName, numberOfSplits); + int numberOfSplits, + boolean skipCurrentRound) { + super(tablePath, tableBucket, partitionName, numberOfSplits, skipCurrentRound); this.snapshotId = snapshotId; this.logOffsetOfSnapshot = logOffsetOfSnapshot; } @@ -84,12 +91,14 @@ public String toString() { + ", partitionName='" + partitionName + '\'' + + ", numberOfSplits=" + + numberOfSplits + + ", skipCurrentRound=" + + skipCurrentRound + ", snapshotId=" + snapshotId + ", logOffsetOfSnapshot=" + logOffsetOfSnapshot - + ", numberOfSplits=" - + numberOfSplits + '}'; } @@ -101,7 +110,8 @@ public TieringSnapshotSplit copy(int numberOfSplits) { partitionName, snapshotId, logOffsetOfSnapshot, - numberOfSplits); + numberOfSplits, + skipCurrentRound); } @Override diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplit.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplit.java index 9da98b8387..d6827a62cf 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplit.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplit.java @@ -41,11 +41,18 @@ public abstract class TieringSplit implements SourceSplit { // the total number of splits in one round of tiering protected final int numberOfSplits; + /** + * Indicates whether to skip tiering data for this split in the current round of tiering. When + * set to true, the split will not be processed and tiering for the split will be skipped. + */ + protected boolean skipCurrentRound; + public TieringSplit( TablePath tablePath, TableBucket tableBucket, @Nullable String partitionName, - int numberOfSplits) { + int numberOfSplits, + boolean skipCurrentRound) { this.tablePath = tablePath; this.tableBucket = tableBucket; this.partitionName = partitionName; @@ -55,6 +62,7 @@ public TieringSplit( "Partition name and partition id must be both null or both not null."); } this.numberOfSplits = numberOfSplits; + this.skipCurrentRound = skipCurrentRound; } /** Checks whether this split is a primary key table split to tier. */ @@ -72,6 +80,23 @@ public final boolean isTieringLogSplit() { return getClass() == TieringLogSplit.class; } + /** + * Marks this split to skip reading data in the current round. Once called, the split will not + * be processed and data reading will be skipped. + */ + public void skipCurrentRound() { + this.skipCurrentRound = true; + } + + /** + * Returns whether this split should skip tiering data in the current round of tiering. + * + * @return true if the split should skip tiering data, false otherwise + */ + public boolean shouldSkipCurrentRound() { + return skipCurrentRound; + } + /** Casts this split into a {@link TieringLogSplit}. */ public TieringLogSplit asTieringLogSplit() { return (TieringLogSplit) this; @@ -128,11 +153,13 @@ public boolean equals(Object object) { return Objects.equals(tablePath, that.tablePath) && Objects.equals(tableBucket, that.tableBucket) && Objects.equals(partitionName, that.partitionName) - && numberOfSplits == that.numberOfSplits; + && numberOfSplits == that.numberOfSplits + && skipCurrentRound == that.skipCurrentRound; } @Override public int hashCode() { - return Objects.hash(tablePath, tableBucket, partitionName, numberOfSplits); + return Objects.hash( + tablePath, tableBucket, partitionName, numberOfSplits, skipCurrentRound); } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java index 373118a972..1bb94a7a92 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java @@ -56,9 +56,8 @@ public TieringSplitGenerator(Admin flussAdmin) { this.flussAdmin = flussAdmin; } - public List generateTableSplits(TablePath tablePath) throws Exception { - - final TableInfo tableInfo = flussAdmin.getTableInfo(tablePath).get(); + public List generateTableSplits(TableInfo tableInfo) throws Exception { + TablePath tablePath = tableInfo.getTablePath(); final BucketOffsetsRetriever bucketOffsetsRetriever = new BucketOffsetsRetrieverImpl(flussAdmin, tablePath); @@ -287,7 +286,7 @@ private Optional generateSplitForPrimaryKeyTableBucket( latestBucketOffset, 0)); } else { - LOG.info( + LOG.debug( "The lastCommittedBucketOffset {} is equals or bigger than latestBucketOffset {}, skip generating split for bucket {}", lastCommittedBucketOffset, latestBucketOffset, @@ -320,8 +319,7 @@ private Optional generateSplitForLogTableBucket( tableBucket, partitionName, EARLIEST_OFFSET, - latestBucketOffset, - 0)); + latestBucketOffset)); } else { // the bucket has been tiered, scan remain fluss log if (lastCommittedBucketOffset < latestBucketOffset) { @@ -331,11 +329,10 @@ private Optional generateSplitForLogTableBucket( tableBucket, partitionName, lastCommittedBucketOffset, - latestBucketOffset, - 0)); + latestBucketOffset)); } } - LOG.info( + LOG.debug( "The lastCommittedBucketOffset {} is equals or bigger than latestBucketOffset {}, skip generating split for bucket {}", lastCommittedBucketOffset, latestBucketOffset, diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializer.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializer.java index 1c2997a540..62c4f78868 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializer.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializer.java @@ -17,6 +17,8 @@ package org.apache.fluss.flink.tiering.source.split; +import org.apache.fluss.flink.tiering.source.TieringSource; +import org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; @@ -26,7 +28,13 @@ import java.io.IOException; -/** A serializer for the {@link TieringSplit}. */ +/** + * A serializer for the {@link TieringSplit}. + * + *

This serializer is only used to serialize and deserialize splits sent from {@link + * TieringSourceEnumerator} to {@link TieringSource} for network transmission. Therefore, it does + * not need to consider compatibility. + */ public class TieringSplitSerializer implements SimpleVersionedSerializer { public static final TieringSplitSerializer INSTANCE = new TieringSplitSerializer(); @@ -76,6 +84,8 @@ public byte[] serialize(TieringSplit split) throws IOException { // write number of splits out.writeInt(split.getNumberOfSplits()); + // write skipCurrentRound + out.writeBoolean(split.shouldSkipCurrentRound()); if (split.isTieringSnapshotSplit()) { // Snapshot split TieringSnapshotSplit tieringSnapshotSplit = split.asTieringSnapshotSplit(); @@ -128,6 +138,7 @@ public TieringSplit deserialize(int version, byte[] serialized) throws IOExcepti // deserialize number of splits int numberOfSplits = in.readInt(); + boolean skipCurrentRound = in.readBoolean(); if (splitKind == TIERING_SNAPSHOT_SPLIT_FLAG) { // deserialize snapshot id @@ -140,7 +151,8 @@ public TieringSplit deserialize(int version, byte[] serialized) throws IOExcepti partitionName, snapshotId, logOffsetOfSnapshot, - numberOfSplits); + numberOfSplits, + skipCurrentRound); } else { // deserialize starting offset long startingOffset = in.readLong(); @@ -152,7 +164,8 @@ public TieringSplit deserialize(int version, byte[] serialized) throws IOExcepti partitionName, startingOffset, stoppingOffset, - numberOfSplits); + numberOfSplits, + skipCurrentRound); } } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/FlinkTieringTestBase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/FlinkTieringTestBase.java index 130a308126..7e3ee48a78 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/FlinkTieringTestBase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/FlinkTieringTestBase.java @@ -22,6 +22,7 @@ import org.apache.fluss.client.ConnectionFactory; import org.apache.fluss.client.admin.Admin; import org.apache.fluss.client.table.Table; +import org.apache.fluss.client.table.writer.AppendWriter; import org.apache.fluss.client.table.writer.TableWriter; import org.apache.fluss.client.table.writer.UpsertWriter; import org.apache.fluss.config.ConfigOptions; @@ -37,6 +38,7 @@ import org.apache.fluss.server.testutils.FlussClusterExtension; import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -44,8 +46,11 @@ import org.junit.jupiter.api.extension.RegisterExtension; import java.time.Duration; +import java.util.Collections; import java.util.List; +import java.util.Map; +import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL; import static org.apache.fluss.testutils.common.CommonTestUtils.retry; import static org.assertj.core.api.Assertions.assertThat; @@ -69,7 +74,7 @@ private static Configuration initConfig() { Configuration conf = new Configuration(); conf.set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, Integer.MAX_VALUE); - // Configure the tiering sink to be Lance + // Configure the tiering sink to be Lance for testing purpose conf.set(ConfigOptions.DATALAKE_FORMAT, DataLakeFormat.LANCE); return conf; } @@ -94,26 +99,33 @@ static void afterAll() throws Exception { } @BeforeEach - public void beforeEach() { + void beforeEach() { execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); execEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING); execEnv.setParallelism(2); + execEnv.enableCheckpointing(500); } - protected long createPkTable( - TablePath tablePath, int bucketNum, boolean enableAutoCompaction, Schema schema) + protected long createTable(TablePath tablePath, Schema schema) throws Exception { + return createTable(tablePath, 1, Collections.emptyList(), schema, Collections.emptyMap()); + } + + protected long createTable( + TablePath tablePath, + int bucketNum, + List bucketKeys, + Schema schema, + Map customProperties) throws Exception { - TableDescriptor.Builder pkTableBuilder = + TableDescriptor.Builder tableBuilder = TableDescriptor.builder() .schema(schema) - .distributedBy(bucketNum) + .distributedBy(bucketNum, bucketKeys) .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true") - .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500)); + .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500)) + .customProperties(customProperties); - if (enableAutoCompaction) { - pkTableBuilder.property(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION.key(), "true"); - } - return createTable(tablePath, pkTableBuilder.build()); + return createTable(tablePath, tableBuilder.build()); } protected long createTable(TablePath tablePath, TableDescriptor tableDescriptor) @@ -138,12 +150,21 @@ protected Replica getLeaderReplica(TableBucket tableBucket) { return FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tableBucket); } - protected void writeRows(TablePath tablePath, List rows) throws Exception { + protected void writeRows(TablePath tablePath, List rows, boolean append) + throws Exception { try (Table table = conn.getTable(tablePath)) { TableWriter tableWriter; - tableWriter = table.newUpsert().createWriter(); + if (append) { + tableWriter = table.newAppend().createWriter(); + } else { + tableWriter = table.newUpsert().createWriter(); + } for (InternalRow row : rows) { - ((UpsertWriter) tableWriter).upsert(row); + if (tableWriter instanceof AppendWriter) { + ((AppendWriter) tableWriter).append(row); + } else { + ((UpsertWriter) tableWriter).upsert(row); + } } tableWriter.flush(); } @@ -152,4 +173,22 @@ protected void writeRows(TablePath tablePath, List rows) throws Exc public List getValuesRecords(TablePath tablePath) { return TestingValuesLake.getResults(tablePath.toString()); } + + protected JobClient buildTieringJob(StreamExecutionEnvironment execEnv) throws Exception { + return buildTieringJob(execEnv, new Configuration()); + } + + protected JobClient buildTieringJob( + StreamExecutionEnvironment execEnv, Configuration lakeTieringConfig) throws Exception { + Configuration flussConfig = new Configuration(clientConf); + flussConfig.set(POLL_TIERING_TABLE_INTERVAL, Duration.ofMillis(500L)); + + return LakeTieringJobBuilder.newBuilder( + execEnv, + flussConfig, + new Configuration(), + lakeTieringConfig, + DataLakeFormat.LANCE.toString()) + .build(); + } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringFailoverITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringFailoverITCase.java index 8470312252..07b2ddbd08 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringFailoverITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringFailoverITCase.java @@ -18,38 +18,23 @@ package org.apache.fluss.flink.tiering; -import org.apache.fluss.config.Configuration; -import org.apache.fluss.flink.tiering.committer.CommittableMessageTypeInfo; -import org.apache.fluss.flink.tiering.committer.TieringCommitOperatorFactory; -import org.apache.fluss.flink.tiering.source.TableBucketWriteResultTypeInfo; -import org.apache.fluss.flink.tiering.source.TieringSource; import org.apache.fluss.lake.values.TestingValuesLake; -import org.apache.fluss.lake.values.tiering.TestingValuesLakeTieringFactory; -import org.apache.fluss.lake.writer.LakeTieringFactory; import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.row.InternalRow; import org.apache.fluss.types.DataTypes; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.core.execution.JobClient; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.Iterator; import java.util.List; -import static org.apache.fluss.flink.tiering.source.TieringSource.TIERING_SOURCE_TRANSFORMATION_UID; -import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL; import static org.apache.fluss.testutils.DataTestUtils.row; import static org.assertj.core.api.Assertions.assertThat; @@ -57,8 +42,6 @@ class TieringFailoverITCase extends FlinkTieringTestBase { protected static final String DEFAULT_DB = "fluss"; - private static StreamExecutionEnvironment execEnv; - private static final Schema schema = Schema.newBuilder() .column("f_int", DataTypes.INT()) @@ -69,21 +52,23 @@ class TieringFailoverITCase extends FlinkTieringTestBase { @BeforeAll protected static void beforeAll() { FlinkTieringTestBase.beforeAll(); - execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - execEnv.setParallelism(2); - execEnv.enableCheckpointing(1000); + } + + @AfterAll + protected static void afterAll() throws Exception { + FlinkTieringTestBase.afterAll(); } @Test void testTiering() throws Exception { // create a pk table, write some records and wait until snapshot finished TablePath t1 = TablePath.of(DEFAULT_DB, "pkTable"); - long t1Id = createPkTable(t1, 1, false, schema); + long t1Id = createTable(t1, schema); TableBucket t1Bucket = new TableBucket(t1Id, 0); // write records List rows = Arrays.asList(row(1, "i1"), row(2, "i2"), row(3, "i3")); List expectedRows = new ArrayList<>(rows); - writeRows(t1, rows); + writeRows(t1, rows, false); FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(t1); // fail the first write to the pk table @@ -102,7 +87,7 @@ void testTiering() throws Exception { rows = Arrays.asList(row(1, "i11"), row(2, "i22"), row(3, "i33")); expectedRows.addAll(rows); // write records - writeRows(t1, rows); + writeRows(t1, rows, false); // check the status of replica of t1 after synced // not check start offset since we won't @@ -115,54 +100,7 @@ void testTiering() throws Exception { } } - @SuppressWarnings({"rawtypes", "unchecked"}) - protected JobClient buildTieringJob(StreamExecutionEnvironment execEnv) throws Exception { - Configuration flussConfig = new Configuration(clientConf); - flussConfig.set(POLL_TIERING_TABLE_INTERVAL, Duration.ofMillis(500L)); - - LakeTieringFactory lakeTieringFactory = new TestingValuesLakeTieringFactory(); - - // build tiering source - TieringSource.Builder tieringSourceBuilder = - new TieringSource.Builder<>(flussConfig, lakeTieringFactory); - if (flussConfig.get(POLL_TIERING_TABLE_INTERVAL) != null) { - tieringSourceBuilder.withPollTieringTableIntervalMs( - flussConfig.get(POLL_TIERING_TABLE_INTERVAL).toMillis()); - } - TieringSource tieringSource = tieringSourceBuilder.build(); - DataStreamSource source = - execEnv.fromSource( - tieringSource, - WatermarkStrategy.noWatermarks(), - "TieringSource", - TableBucketWriteResultTypeInfo.of( - () -> lakeTieringFactory.getWriteResultSerializer())); - - source.getTransformation().setUid(TIERING_SOURCE_TRANSFORMATION_UID); - - source.transform( - "TieringCommitter", - CommittableMessageTypeInfo.of( - () -> lakeTieringFactory.getCommittableSerializer()), - new TieringCommitOperatorFactory( - flussConfig, - Configuration.fromMap(Collections.emptyMap()), - lakeTieringFactory)) - .setParallelism(1) - .setMaxParallelism(1) - .sinkTo(new DiscardingSink()) - .name("end") - .setParallelism(1); - String jobName = - execEnv.getConfiguration() - .getOptional(PipelineOptions.NAME) - .orElse("Fluss Lake Tiering FailOver IT Test."); - - return execEnv.executeAsync(jobName); - } - - private void checkDataInValuesTable(TablePath tablePath, List expectedRows) - throws Exception { + private void checkDataInValuesTable(TablePath tablePath, List expectedRows) { Iterator actualIterator = getValuesRecords(tablePath).iterator(); Iterator iterator = expectedRows.iterator(); while (iterator.hasNext() && actualIterator.hasNext()) { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringITCase.java new file mode 100644 index 0000000000..fe81b5a842 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringITCase.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering; + +import org.apache.fluss.client.metadata.LakeSnapshot; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.LakeTableSnapshotNotExistException; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.GenericRow; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.types.DataTypes; +import org.apache.fluss.utils.ExceptionUtils; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.fluss.testutils.common.CommonTestUtils.waitValue; +import static org.assertj.core.api.Assertions.assertThat; + +/** The IT case for tiering. */ +class TieringITCase extends FlinkTieringTestBase { + + @BeforeAll + protected static void beforeAll() { + FlinkTieringTestBase.beforeAll(); + } + + @AfterAll + protected static void afterAll() throws Exception { + FlinkTieringTestBase.afterAll(); + } + + @BeforeEach + @Override + void beforeEach() { + execEnv = + StreamExecutionEnvironment.getExecutionEnvironment() + .setParallelism(1) + .setRuntimeMode(RuntimeExecutionMode.STREAMING); + } + + @Test + void testTieringReachMaxDuration() throws Exception { + TablePath logTablePath = TablePath.of("fluss", "logtable"); + createTable(logTablePath, false); + TablePath pkTablePath = TablePath.of("fluss", "pktable"); + createTable(pkTablePath, true); + + // write some records to log table + List rows = new ArrayList<>(); + int recordCount = 6; + for (int i = 0; i < recordCount; i++) { + rows.add(GenericRow.of(i, BinaryString.fromString("v" + i))); + } + writeRows(logTablePath, rows, true); + + rows = new ArrayList<>(); + // write 6 records to primary key table, each bucket should only contain few record + for (int i = 0; i < recordCount; i++) { + rows.add(GenericRow.of(i, BinaryString.fromString("v" + i))); + } + writeRows(pkTablePath, rows, false); + + FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(pkTablePath); + + // set tiering duration to a small value for testing purpose + Configuration lakeTieringConfig = new Configuration(); + JobClient jobClient = buildTieringJob(execEnv, lakeTieringConfig); + + try { + // verify the tiered records is less than the table total record to + // make sure tiering is forced to complete when reach max duration + LakeSnapshot logTableLakeSnapshot = waitLakeSnapshot(logTablePath); + long tieredRecords = countTieredRecords(logTableLakeSnapshot); + assertThat(tieredRecords).isLessThan(recordCount); + + // verify the tiered records is less than the table total record to + // make sure tiering is forced to complete when reach max duration + LakeSnapshot pkTableLakeSnapshot = waitLakeSnapshot(pkTablePath); + tieredRecords = countTieredRecords(pkTableLakeSnapshot); + assertThat(tieredRecords).isLessThan(recordCount); + } finally { + jobClient.cancel(); + } + } + + private long countTieredRecords(LakeSnapshot lakeSnapshot) { + return lakeSnapshot.getTableBucketsOffset().values().stream() + .mapToLong(Long::longValue) + .sum(); + } + + private LakeSnapshot waitLakeSnapshot(TablePath tablePath) { + return waitValue( + () -> { + try { + return Optional.of(admin.getLatestLakeSnapshot(tablePath).get()); + } catch (Exception e) { + if (ExceptionUtils.stripExecutionException(e) + instanceof LakeTableSnapshotNotExistException) { + return Optional.empty(); + } + throw e; + } + }, + Duration.ofSeconds(30), + "Fail to wait for one round of tiering finish for table " + tablePath); + } + + private void createTable(TablePath tablePath, boolean isPrimaryKeyTable) throws Exception { + Schema.Builder schemaBuilder = + Schema.newBuilder().column("a", DataTypes.INT()).column("b", DataTypes.STRING()); + if (isPrimaryKeyTable) { + schemaBuilder.primaryKey("a"); + } + + // see TestingPaimonStoragePlugin#TestingPaimonWriter, we set write-pause + // to 1s to make it easy to mock tiering reach max duration + Map customProperties = Collections.singletonMap("write-pause", "1s"); + createTable( + tablePath, + 3, + Collections.singletonList("a"), + schemaBuilder.build(), + customProperties); + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java new file mode 100644 index 0000000000..9e9de2c792 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering.source; + +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.flink.tiering.TestingLakeTieringFactory; +import org.apache.fluss.flink.tiering.TestingWriteResult; +import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent; +import org.apache.fluss.flink.tiering.source.split.TieringLogSplit; +import org.apache.fluss.flink.utils.FlinkTestBase; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TablePath; + +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.testutils.source.reader.TestingReaderContext; +import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; + +import static org.apache.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET; +import static org.apache.fluss.testutils.DataTestUtils.row; +import static org.apache.fluss.testutils.common.CommonTestUtils.retry; +import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link TieringSourceReader}. */ +class TieringSourceReaderTest extends FlinkTestBase { + + @Test + void testHandleTieringReachMaxDurationEvent() throws Exception { + TablePath tablePath = TablePath.of("fluss", "test_tiering_reach_max_duration"); + long tableId = createTable(tablePath, DEFAULT_LOG_TABLE_DESCRIPTOR); + Configuration conf = new Configuration(FLUSS_CLUSTER_EXTENSION.getClientConfig()); + conf.set( + ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER, + ConfigOptions.NoKeyAssigner.ROUND_ROBIN); + try (Connection connection = ConnectionFactory.createConnection(conf)) { + FutureCompletingBlockingQueue< + RecordsWithSplitIds>> + elementsQueue = new FutureCompletingBlockingQueue<>(16); + TestingReaderContext readerContext = new TestingReaderContext(); + try (TieringSourceReader reader = + new TieringSourceReader<>( + elementsQueue, + readerContext, + connection, + new TestingLakeTieringFactory(), + Duration.ofMillis(500))) { + + reader.start(); + + // no data, add a split for the table, + // should be force be complete after reach max duration + TieringLogSplit split = + new TieringLogSplit( + tablePath, new TableBucket(tableId, 0), null, EARLIEST_OFFSET, 100); + reader.addSplits(Collections.singletonList(split)); + + // send TieringReachMaxDurationEvent + TieringReachMaxDurationEvent event = new TieringReachMaxDurationEvent(tableId); + reader.handleSourceEvents(event); + + retry( + Duration.ofMinutes(1), + () -> { + TestingReaderOutput> output = + new TestingReaderOutput<>(); + // should force to finish, the write result is null + reader.pollNext(output); + assertThat(output.getEmittedRecords()).hasSize(1); + TableBucketWriteResult result = + output.getEmittedRecords().get(0); + assertThat(result.writeResult()).isNull(); + }); + + // write some data + writeRows( + connection, + tablePath, + Arrays.asList(row(0, "v0"), row(1, "v1"), row(2, "v2")), + true); + split = + new TieringLogSplit( + tablePath, + new TableBucket(tableId, 2), + null, + EARLIEST_OFFSET, + // use 100L as end offset, so that + // tiering won't be finished if no tiering reach max duration logic + 100L); + + reader.addSplits(Collections.singletonList(split)); + + // wait to run one round of tiering to do some tiering + FutureCompletingBlockingQueue< + RecordsWithSplitIds>> + blockingQueue = getElementsQueue(reader); + // wait blockingQueue is not empty to make sure we have one fetch + // in tiering source reader + waitUntil( + () -> !blockingQueue.isEmpty(), + Duration.ofSeconds(30), + "Fail to wait element queue is not empty."); + + // send TieringReachMaxDurationEvent + event = new TieringReachMaxDurationEvent(tableId); + reader.handleSourceEvents(event); + + // make sure tiering will be finished, still maintain the result + // of previous tiering + retry( + Duration.ofMinutes(1), + () -> { + TestingReaderOutput> + output1 = new TestingReaderOutput<>(); + + // should force to finish, the write result isn't null + reader.pollNext(output1); + assertThat(output1.getEmittedRecords()).hasSize(1); + TableBucketWriteResult result = + output1.getEmittedRecords().get(0); + TestingWriteResult testingWriteResult = result.writeResult(); + assertThat(testingWriteResult).isNotNull(); + assertThat(result.logEndOffset()).isEqualTo(1); + }); + + // test add split with skipCurrentRound + split = + new TieringLogSplit( + tablePath, + new TableBucket(tableId, 1), + null, + EARLIEST_OFFSET, + 100L); + split.skipCurrentRound(); + reader.addSplits(Collections.singletonList(split)); + // should skip tiering for this split + retry( + Duration.ofMinutes(1), + () -> { + TestingReaderOutput> + output1 = new TestingReaderOutput<>(); + // should force to finish, and the result is null + reader.pollNext(output1); + assertThat(output1.getEmittedRecords()).hasSize(1); + TableBucketWriteResult result = + output1.getEmittedRecords().get(0); + assertThat(result.writeResult()).isNull(); + }); + } + } + } + + /** + * Get the elementsQueue from TieringSourceReader using reflection. + * + * @param reader the TieringSourceReader instance + * @return the elementsQueue field value + */ + @SuppressWarnings("unchecked") + private FutureCompletingBlockingQueue< + RecordsWithSplitIds>> + getElementsQueue(TieringSourceReader reader) throws Exception { + Class clazz = reader.getClass(); + while (clazz != null) { + try { + Field elementsQueueField = clazz.getDeclaredField("elementsQueue"); + elementsQueueField.setAccessible(true); + return (FutureCompletingBlockingQueue< + RecordsWithSplitIds>>) + elementsQueueField.get(reader); + } catch (NoSuchFieldException e) { + // Try parent class + clazz = clazz.getSuperclass(); + } + } + throw new RuntimeException("No elementsQueue field found"); + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java index 15f62f3d1b..7ea150049e 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java @@ -17,30 +17,35 @@ package org.apache.fluss.flink.tiering.source.enumerator; +import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.flink.tiering.event.FailedTieringEvent; import org.apache.fluss.flink.tiering.event.FinishedTieringEvent; +import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent; import org.apache.fluss.flink.tiering.source.TieringTestBase; import org.apache.fluss.flink.tiering.source.split.TieringLogSplit; import org.apache.fluss.flink.tiering.source.split.TieringSnapshotSplit; import org.apache.fluss.flink.tiering.source.split.TieringSplit; import org.apache.fluss.flink.tiering.source.split.TieringSplitGenerator; import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest; import org.apache.fluss.rpc.messages.PbLakeTableOffsetForBucket; import org.apache.fluss.rpc.messages.PbLakeTableSnapshotInfo; +import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import javax.annotation.Nullable; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -48,6 +53,7 @@ import static org.apache.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET; import static org.apache.fluss.config.ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE; +import static org.apache.fluss.testutils.common.CommonTestUtils.retry; import static org.assertj.core.api.Assertions.assertThat; /** Unit tests for {@link TieringSourceEnumerator} and {@link TieringSplitGenerator}. */ @@ -75,8 +81,7 @@ void testPrimaryKeyTableWithNoSnapshotSplits() throws Throwable { // test get snapshot split & log split and the assignment try (FlussMockSplitEnumeratorContext context = new FlussMockSplitEnumeratorContext<>(numSubtasks)) { - TieringSourceEnumerator enumerator = - new TieringSourceEnumerator(flussConf, context, 500); + TieringSourceEnumerator enumerator = createTieringSourceEnumerator(flussConf, context); enumerator.start(); assertThat(context.getSplitsAssignmentSequence()).isEmpty(); @@ -119,25 +124,23 @@ void testPrimaryKeyTableWithNoSnapshotSplits() throws Throwable { } waitUntilTieringTableSplitAssignmentReady(context, DEFAULT_BUCKET_NUM, 500L); - Map> expectedLogAssignment = new HashMap<>(); + List expectedLogAssignment = new ArrayList<>(); for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM; tableBucket++) { - expectedLogAssignment.put( - tableBucket, - Collections.singletonList( - new TieringLogSplit( - tablePath, - new TableBucket(tableId, tableBucket), - null, - bucketOffsetOfInitialWrite.get(tableBucket), - bucketOffsetOfInitialWrite.get(tableBucket) - + bucketOffsetOfSecondWrite.get(tableBucket), - expectNumberOfSplits))); - } - Map> actualLogAssignment = new HashMap<>(); - for (SplitsAssignment a : context.getSplitsAssignmentSequence()) { - actualLogAssignment.putAll(a.assignment()); + expectedLogAssignment.add( + new TieringLogSplit( + tablePath, + new TableBucket(tableId, tableBucket), + null, + bucketOffsetOfInitialWrite.get(tableBucket), + bucketOffsetOfInitialWrite.get(tableBucket) + + bucketOffsetOfSecondWrite.get(tableBucket), + expectNumberOfSplits)); } - assertThat(actualLogAssignment).isEqualTo(expectedLogAssignment); + List actualLogAssignment = new ArrayList<>(); + context.getSplitsAssignmentSequence() + .forEach(a -> a.assignment().values().forEach(actualLogAssignment::addAll)); + assertThat(actualLogAssignment) + .containsExactlyInAnyOrderElementsOf(expectedLogAssignment); } } @@ -156,8 +159,7 @@ void testPrimaryKeyTableWithSnapshotSplits() throws Throwable { // test get snapshot split assignment try (FlussMockSplitEnumeratorContext context = new FlussMockSplitEnumeratorContext<>(numSubtasks)) { - TieringSourceEnumerator enumerator = - new TieringSourceEnumerator(flussConf, context, 500); + TieringSourceEnumerator enumerator = createTieringSourceEnumerator(flussConf, context); enumerator.start(); assertThat(context.getSplitsAssignmentSequence()).isEmpty(); @@ -166,24 +168,22 @@ void testPrimaryKeyTableWithSnapshotSplits() throws Throwable { registerReaderAndHandleSplitRequests(context, enumerator, numSubtasks, 0); waitUntilTieringTableSplitAssignmentReady(context, DEFAULT_BUCKET_NUM, 3000L); - Map> expectedSnapshotAssignment = new HashMap<>(); + List expectedSnapshotAssignment = new ArrayList<>(); for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM; tableBucket++) { - expectedSnapshotAssignment.put( - tableBucket, - Collections.singletonList( - new TieringSnapshotSplit( - tablePath, - new TableBucket(tableId, tableBucket), - null, - snapshotId, - bucketOffsetOfInitialWrite.get(tableBucket), - expectNumberOfSplits))); - } - Map> actualSnapshotAssignment = new HashMap<>(); - for (SplitsAssignment a : context.getSplitsAssignmentSequence()) { - actualSnapshotAssignment.putAll(a.assignment()); + expectedSnapshotAssignment.add( + new TieringSnapshotSplit( + tablePath, + new TableBucket(tableId, tableBucket), + null, + snapshotId, + bucketOffsetOfInitialWrite.get(tableBucket), + expectNumberOfSplits)); } - assertThat(actualSnapshotAssignment).isEqualTo(expectedSnapshotAssignment); + List actualAssignment = new ArrayList<>(); + context.getSplitsAssignmentSequence() + .forEach(a -> a.assignment().values().forEach(actualAssignment::addAll)); + assertThat(actualAssignment) + .containsExactlyInAnyOrderElementsOf(expectedSnapshotAssignment); // mock finished tiered this round, check second round context.getSplitsAssignmentSequence().clear(); @@ -212,25 +212,23 @@ void testPrimaryKeyTableWithSnapshotSplits() throws Throwable { // three log splits will be ready soon waitUntilTieringTableSplitAssignmentReady(context, DEFAULT_BUCKET_NUM, 500L); - Map> expectedLogAssignment = new HashMap<>(); + List expectedLogAssignment = new ArrayList<>(); for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM; tableBucket++) { - expectedLogAssignment.put( - tableBucket, - Collections.singletonList( - new TieringLogSplit( - tablePath, - new TableBucket(tableId, tableBucket), - null, - bucketOffsetOfInitialWrite.get(tableBucket), - bucketOffsetOfInitialWrite.get(tableBucket) - + bucketOffsetOfSecondWrite.get(tableBucket), - expectNumberOfSplits))); - } - Map> actualLogAssignment = new HashMap<>(); - for (SplitsAssignment a : context.getSplitsAssignmentSequence()) { - actualLogAssignment.putAll(a.assignment()); + expectedLogAssignment.add( + new TieringLogSplit( + tablePath, + new TableBucket(tableId, tableBucket), + null, + bucketOffsetOfInitialWrite.get(tableBucket), + bucketOffsetOfInitialWrite.get(tableBucket) + + bucketOffsetOfSecondWrite.get(tableBucket), + expectNumberOfSplits)); } - assertThat(actualLogAssignment).isEqualTo(expectedLogAssignment); + List actualLogAssignment = new ArrayList<>(); + context.getSplitsAssignmentSequence() + .forEach(a -> a.assignment().values().forEach(actualLogAssignment::addAll)); + assertThat(actualLogAssignment) + .containsExactlyInAnyOrderElementsOf(expectedLogAssignment); } } @@ -243,8 +241,7 @@ void testLogTableSplits() throws Throwable { // test get log split and the assignment try (FlussMockSplitEnumeratorContext context = new FlussMockSplitEnumeratorContext<>(numSubtasks)) { - TieringSourceEnumerator enumerator = - new TieringSourceEnumerator(flussConf, context, 500); + TieringSourceEnumerator enumerator = createTieringSourceEnumerator(flussConf, context); enumerator.start(); assertThat(context.getSplitsAssignmentSequence()).isEmpty(); @@ -273,14 +270,12 @@ void testLogTableSplits() throws Throwable { context.getSplitsAssignmentSequence() .forEach(a -> a.assignment().values().forEach(actualAssignment::addAll)); - assertThat(actualAssignment).isEqualTo(expectedAssignment); + assertThat(actualAssignment).containsExactlyInAnyOrderElementsOf(expectedAssignment); // mock finished tiered this round, check second round context.getSplitsAssignmentSequence().clear(); - final Map bucketOffsetOfEarliest = new HashMap<>(); final Map bucketOffsetOfInitialWrite = new HashMap<>(); for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM; tableBucket++) { - bucketOffsetOfEarliest.put(tableBucket, EARLIEST_OFFSET); bucketOffsetOfInitialWrite.put( tableBucket, bucketOffsetOfFirstWrite.getOrDefault(tableBucket, 0L)); } @@ -303,25 +298,23 @@ void testLogTableSplits() throws Throwable { waitUntilTieringTableSplitAssignmentReady(context, DEFAULT_BUCKET_NUM, 500L); - Map> expectedLogAssignment = new HashMap<>(); + List expectedLogAssignment = new ArrayList<>(); for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM; tableBucket++) { - expectedLogAssignment.put( - tableBucket, - Collections.singletonList( - new TieringLogSplit( - tablePath, - new TableBucket(tableId, tableBucket), - null, - bucketOffsetOfInitialWrite.get(tableBucket), - bucketOffsetOfInitialWrite.get(tableBucket) - + bucketOffsetOfSecondWrite.get(tableBucket), - expectNumberOfSplits))); - } - Map> actualLogAssignment = new HashMap<>(); - for (SplitsAssignment a : context.getSplitsAssignmentSequence()) { - actualLogAssignment.putAll(a.assignment()); + expectedLogAssignment.add( + new TieringLogSplit( + tablePath, + new TableBucket(tableId, tableBucket), + null, + bucketOffsetOfInitialWrite.get(tableBucket), + bucketOffsetOfInitialWrite.get(tableBucket) + + bucketOffsetOfSecondWrite.get(tableBucket), + expectNumberOfSplits)); } - assertThat(actualLogAssignment).isEqualTo(expectedLogAssignment); + List actualLogAssignment = new ArrayList<>(); + context.getSplitsAssignmentSequence() + .forEach(a -> a.assignment().values().forEach(actualLogAssignment::addAll)); + assertThat(actualLogAssignment) + .containsExactlyInAnyOrderElementsOf(expectedLogAssignment); } } @@ -339,8 +332,7 @@ void testPartitionedPrimaryKeyTable() throws Throwable { // test get snapshot split assignment try (FlussMockSplitEnumeratorContext context = new FlussMockSplitEnumeratorContext<>(numSubtasks)) { - TieringSourceEnumerator enumerator = - new TieringSourceEnumerator(flussConf, context, 500); + TieringSourceEnumerator enumerator = createTieringSourceEnumerator(flussConf, context); enumerator.start(); assertThat(context.getSplitsAssignmentSequence()).isEmpty(); @@ -421,8 +413,8 @@ void testPartitionedPrimaryKeyTable() throws Throwable { context.getSplitsAssignmentSequence()) { splitsAssignment.assignment().values().forEach(actualLogAssignment::addAll); } - assertThat(sortSplits(actualLogAssignment)) - .isEqualTo(sortSplits(expectedLogAssignment)); + assertThat(actualLogAssignment) + .containsExactlyInAnyOrderElementsOf(expectedLogAssignment); } } @@ -440,8 +432,7 @@ void testPartitionedLogTableSplits() throws Throwable { // test get log split assignment try (FlussMockSplitEnumeratorContext context = new FlussMockSplitEnumeratorContext<>(numSubtasks)) { - TieringSourceEnumerator enumerator = - new TieringSourceEnumerator(flussConf, context, 500); + TieringSourceEnumerator enumerator = createTieringSourceEnumerator(flussConf, context); enumerator.start(); assertThat(context.getSplitsAssignmentSequence()).isEmpty(); @@ -478,7 +469,7 @@ void testPartitionedLogTableSplits() throws Throwable { context.getSplitsAssignmentSequence()) { splitsAssignment.assignment().values().forEach(actualAssignment::addAll); } - assertThat(sortSplits(actualAssignment)).isEqualTo(sortSplits(expectedAssignment)); + assertThat(actualAssignment).containsExactlyInAnyOrderElementsOf(expectedAssignment); // mock finished tiered this round, check second round context.getSplitsAssignmentSequence().clear(); @@ -486,10 +477,8 @@ void testPartitionedLogTableSplits() throws Throwable { long snapshot = 1; for (Map.Entry partitionNameById : partitionNameByIds.entrySet()) { long partitionId = partitionNameById.getValue(); - Map partitionInitialBucketOffsets = new HashMap<>(); Map partitionBucketOffsetOfInitialWrite = new HashMap<>(); for (int tableBucket = 0; tableBucket < DEFAULT_BUCKET_NUM; tableBucket++) { - partitionInitialBucketOffsets.put(tableBucket, EARLIEST_OFFSET); partitionBucketOffsetOfInitialWrite.put( tableBucket, bucketOffsetOfFirstWrite @@ -547,8 +536,8 @@ void testPartitionedLogTableSplits() throws Throwable { context.getSplitsAssignmentSequence()) { splitsAssignment.assignment().values().forEach(actualLogAssignment::addAll); } - assertThat(sortSplits(actualLogAssignment)) - .isEqualTo(sortSplits(expectedLogAssignment)); + assertThat(actualLogAssignment) + .containsExactlyInAnyOrderElementsOf(expectedLogAssignment); } } @@ -563,8 +552,7 @@ void testHandleFailedTieringTableEvent() throws Throwable { // test get log split and the assignment try (FlussMockSplitEnumeratorContext context = new FlussMockSplitEnumeratorContext<>(numSubtasks)) { - TieringSourceEnumerator enumerator = - new TieringSourceEnumerator(flussConf, context, 500); + TieringSourceEnumerator enumerator = createTieringSourceEnumerator(flussConf, context); enumerator.start(); assertThat(context.getSplitsAssignmentSequence()).isEmpty(); @@ -588,7 +576,7 @@ void testHandleFailedTieringTableEvent() throws Throwable { context.getSplitsAssignmentSequence() .forEach(a -> a.assignment().values().forEach(actualAssignment::addAll)); - assertThat(actualAssignment).isEqualTo(expectedAssignment); + assertThat(actualAssignment).containsExactlyInAnyOrderElementsOf(expectedAssignment); // mock tiering fail by send tiering fail event context.getSplitsAssignmentSequence().clear(); @@ -602,7 +590,7 @@ void testHandleFailedTieringTableEvent() throws Throwable { List actualAssignment1 = new ArrayList<>(); context.getSplitsAssignmentSequence() .forEach(a -> a.assignment().values().forEach(actualAssignment1::addAll)); - assertThat(actualAssignment1).isEqualTo(expectedAssignment); + assertThat(actualAssignment1).containsExactlyInAnyOrderElementsOf(expectedAssignment); } } @@ -618,8 +606,7 @@ void testHandleReaderFailOver() throws Throwable { try (FlussMockSplitEnumeratorContext context = new FlussMockSplitEnumeratorContext<>(3)) { - TieringSourceEnumerator enumerator = - new TieringSourceEnumerator(flussConf, context, 500); + TieringSourceEnumerator enumerator = createTieringSourceEnumerator(flussConf, context); enumerator.start(); assertThat(context.getSplitsAssignmentSequence()).isEmpty(); @@ -646,11 +633,16 @@ void testHandleReaderFailOver() throws Throwable { // readers failover again: Enumerator marks tablePath2 as failed and clears its splits // register readers and process split requests for attempt 2 - registerReaderAndHandleSplitRequests(context, enumerator, 3, 2); + registerReaderAndHandleSplitRequests(context, enumerator, 2, 2); // now, should get no tiering split, since all pending split for tablePath1 and - // tablePath2 is clear + // tablePath2 is clear and there still one sub-task is not registered verifyTieringSplitAssignment(context, 0, tablePath2); + + // register reader 2 again, should get tiering split for table1 since the failover is + // finished, and reader2 request tiering split + registerSingleReaderAndHandleSplitRequests(context, enumerator, 2, 2); + verifyTieringSplitAssignment(context, 3, tablePath1); } } @@ -684,12 +676,21 @@ private void registerReaderAndHandleSplitRequests( int numSubtasks, int attemptNumber) { for (int subtaskId = 0; subtaskId < numSubtasks; subtaskId++) { - context.registerSourceReader(subtaskId, attemptNumber, "localhost-" + subtaskId); - enumerator.addReader(subtaskId); - enumerator.handleSplitRequest(subtaskId, "localhost-" + subtaskId); + registerSingleReaderAndHandleSplitRequests( + context, enumerator, subtaskId, attemptNumber); } } + private void registerSingleReaderAndHandleSplitRequests( + FlussMockSplitEnumeratorContext context, + TieringSourceEnumerator enumerator, + int subtaskId, + int attemptNumber) { + context.registerSourceReader(subtaskId, attemptNumber, "localhost-" + subtaskId); + enumerator.addReader(subtaskId); + enumerator.handleSplitRequest(subtaskId, "localhost-" + subtaskId); + } + private void waitUntilTieringTableSplitAssignmentReady( FlussMockSplitEnumeratorContext context, int expectedSplitsNum, @@ -705,12 +706,6 @@ private void waitUntilTieringTableSplitAssignmentReady( } } - private static List sortSplits(List splits) { - return splits.stream() - .sorted(Comparator.comparing(Object::toString)) - .collect(Collectors.toList()); - } - private void verifyTieringSplitAssignment( FlussMockSplitEnumeratorContext context, int expectedSplitSize, @@ -729,4 +724,118 @@ private void verifyTieringSplitAssignment( assertThat(allTieringSplits) .allMatch(tieringSplit -> tieringSplit.getTablePath().equals(expectedTablePath)); } + + private TieringSourceEnumerator createTieringSourceEnumerator( + Configuration flussConf, MockSplitEnumeratorContext context) { + return new TieringSourceEnumerator(flussConf, context, 500); + } + + @Test + void testTableReachMaxTieringDuration() throws Throwable { + TablePath tablePath = TablePath.of(DEFAULT_DB, "tiering-max-duration-test-log-table"); + long tableId = createTable(tablePath, DEFAULT_LOG_TABLE_DESCRIPTOR); + int numSubtasks = 2; + + try (FlussMockSplitEnumeratorContext context = + new FlussMockSplitEnumeratorContext<>(numSubtasks); + TieringSourceEnumerator enumerator = + createTieringSourceEnumerator(flussConf, context); ) { + enumerator.start(); + + // Register all readers + for (int subtaskId = 0; subtaskId < numSubtasks; subtaskId++) { + context.registerSourceReader(subtaskId, subtaskId, "localhost-" + subtaskId); + } + + appendRow(tablePath, DEFAULT_LOG_TABLE_DESCRIPTOR, 0, 10); + + for (int subTask = 0; subTask < numSubtasks; subTask++) { + enumerator.handleSplitRequest(subTask, "localhost-" + subTask); + } + + // Wait for initial assignment + waitUntilTieringTableSplitAssignmentReady(context, 2, 200L); + + retry( + Duration.ofSeconds(30), + () -> { + // Verify that TieringReachMaxDurationEvent was sent to all readers + // Use reflection to access events sent to readers + Map> eventsToReaders = + context.getSentSourceEvent(); + assertThat(eventsToReaders).hasSize(numSubtasks); + for (Map.Entry> entry : + eventsToReaders.entrySet()) { + assertThat(entry.getValue()) + .containsExactly(new TieringReachMaxDurationEvent(tableId)); + } + }); + + // clear split assignment + context.getSplitsAssignmentSequence().clear(); + + // request a split again + enumerator.handleSplitRequest(0, "localhost-0"); + + // the split should be marked as skipCurrentRound + waitUntilTieringTableSplitAssignmentReady(context, 1, 100L); + + List assignedSplits = new ArrayList<>(); + context.getSplitsAssignmentSequence() + .forEach(a -> a.assignment().values().forEach(assignedSplits::addAll)); + assertThat(assignedSplits).hasSize(1); + assertThat(assignedSplits.get(0).shouldSkipCurrentRound()).isTrue(); + + // alter table freshness to 10 min to make sure we won't assign in + // normal finish + conn.getAdmin() + .alterTable( + tablePath, + Collections.singletonList( + TableChange.set( + ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(), "10min")), + false) + .get(); + + // Mock tiering finished + // This simulates the reader finishing tiering after reaching max duration + enumerator.handleSourceEvent(0, new FinishedTieringEvent(tableId)); + + // Clear split assignment to prepare for next round + context.getSplitsAssignmentSequence().clear(); + + // Run periodic callable to trigger heartbeat, which will report force finish to + // coordinator + // The coordinator will move the table from Tiered to Pending state + if (!context.getPeriodicCallables().isEmpty()) { + context.runPeriodicCallable(0); + } + + // Request table again - since the table was force finished, it should be immediately + // available in Pending state + for (int subTask = 0; subTask < numSubtasks; subTask++) { + enumerator.handleSplitRequest(subTask, "localhost-" + subTask); + } + + // Run periodic callable again to request table from coordinator + if (!context.getPeriodicCallables().isEmpty()) { + context.runPeriodicCallable(0); + } + + // The table should be immediately requested again (epoch should be 2) + // Wait for the table to be assigned again + waitUntilTieringTableSplitAssignmentReady(context, 2, 500L); + + List reassignedSplits = new ArrayList<>(); + context.getSplitsAssignmentSequence() + .forEach(a -> a.assignment().values().forEach(reassignedSplits::addAll)); + assertThat(reassignedSplits).hasSize(2); + // Verify that the table is requested again with a new epoch (epoch 2) + assertThat(reassignedSplits) + .allMatch( + split -> + split.getTableBucket().getTableId() == tableId + && !split.shouldSkipCurrentRound()); + } + } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializerTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializerTest.java index 33356dec40..45cbd76a80 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializerTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializerTest.java @@ -20,6 +20,7 @@ import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -68,8 +69,8 @@ void testTieringSnapshotSplitStringExpression(Boolean isPartitionedTable) throws String expectedSplitString = isPartitionedTable - ? "TieringSnapshotSplit{tablePath=test_db.test_partitioned_table, tableBucket=TableBucket{tableId=1, partitionId=100, bucket=2}, partitionName='1024', snapshotId=0, logOffsetOfSnapshot=200, numberOfSplits=30}" - : "TieringSnapshotSplit{tablePath=test_db.test_table, tableBucket=TableBucket{tableId=1, bucket=2}, partitionName='null', snapshotId=0, logOffsetOfSnapshot=200, numberOfSplits=30}"; + ? "TieringSnapshotSplit{tablePath=test_db.test_partitioned_table, tableBucket=TableBucket{tableId=1, partitionId=100, bucket=2}, partitionName='1024', numberOfSplits=30, skipCurrentRound=false, snapshotId=0, logOffsetOfSnapshot=200}" + : "TieringSnapshotSplit{tablePath=test_db.test_table, tableBucket=TableBucket{tableId=1, bucket=2}, partitionName='null', numberOfSplits=30, skipCurrentRound=false, snapshotId=0, logOffsetOfSnapshot=200}"; assertThat(new TieringSnapshotSplit(path, bucket, partitionName, 0L, 200L, 30).toString()) .isEqualTo(expectedSplitString); } @@ -102,9 +103,52 @@ void testTieringLogSplitStringExpression(Boolean isPartitionedTable) throws Exce String expectedSplitString = isPartitionedTable - ? "TieringLogSplit{tablePath=test_db.test_partitioned_table, tableBucket=TableBucket{tableId=1, partitionId=100, bucket=2}, partitionName='1024', startingOffset=100, stoppingOffset=200, numberOfSplits=2}" - : "TieringLogSplit{tablePath=test_db.test_table, tableBucket=TableBucket{tableId=1, bucket=2}, partitionName='null', startingOffset=100, stoppingOffset=200, numberOfSplits=2}"; + ? "TieringLogSplit{tablePath=test_db.test_partitioned_table, tableBucket=TableBucket{tableId=1, partitionId=100, bucket=2}, partitionName='1024', numberOfSplits=2, skipCurrentRound=false, startingOffset=100, stoppingOffset=200}" + : "TieringLogSplit{tablePath=test_db.test_table, tableBucket=TableBucket{tableId=1, bucket=2}, partitionName='null', numberOfSplits=2, skipCurrentRound=false, startingOffset=100, stoppingOffset=200}"; assertThat(new TieringLogSplit(path, bucket, partitionName, 100, 200, 2).toString()) .isEqualTo(expectedSplitString); } + + @Test + void testSkipCurrentRoundSerde() throws Exception { + // Test TieringSnapshotSplit with skipCurrentRound set at creation + TieringSnapshotSplit snapshotSplitWithSkipCurrentRound = + new TieringSnapshotSplit(tablePath, tableBucket, null, 0L, 200L, 10, true); + byte[] serialized = serializer.serialize(snapshotSplitWithSkipCurrentRound); + TieringSnapshotSplit deserializedSnapshotSplit = + (TieringSnapshotSplit) serializer.deserialize(serializer.getVersion(), serialized); + assertThat(deserializedSnapshotSplit).isEqualTo(snapshotSplitWithSkipCurrentRound); + + // Test TieringLogSplit with skipCurrentRound set at creation + TieringLogSplit logSplitWithSkipCurrentRound = + new TieringLogSplit(tablePath, tableBucket, null, 100, 200, 40, true); + serialized = serializer.serialize(logSplitWithSkipCurrentRound); + TieringLogSplit deserializedLogSplit = + (TieringLogSplit) serializer.deserialize(serializer.getVersion(), serialized); + assertThat(deserializedLogSplit).isEqualTo(logSplitWithSkipCurrentRound); + + // Test TieringSnapshotSplit with skipCurrentRound set after creation + TieringSnapshotSplit snapshotSplit = + new TieringSnapshotSplit(tablePath, tableBucket, null, 0L, 200L, 10, false); + assertThat(snapshotSplit.shouldSkipCurrentRound()).isFalse(); + snapshotSplit.skipCurrentRound(); + assertThat(snapshotSplit.shouldSkipCurrentRound()).isTrue(); + + serialized = serializer.serialize(snapshotSplit); + deserializedSnapshotSplit = + (TieringSnapshotSplit) serializer.deserialize(serializer.getVersion(), serialized); + assertThat(deserializedSnapshotSplit).isEqualTo(snapshotSplit); + + // Test TieringLogSplit with skipCurrentRound set after creation + TieringLogSplit logSplit = + new TieringLogSplit(tablePath, tableBucket, null, 100, 200, 40, false); + assertThat(logSplit.shouldSkipCurrentRound()).isFalse(); + logSplit.skipCurrentRound(); + assertThat(logSplit.shouldSkipCurrentRound()).isTrue(); + + serialized = serializer.serialize(logSplit); + deserializedLogSplit = + (TieringLogSplit) serializer.deserialize(serializer.getVersion(), serialized); + assertThat(deserializedLogSplit).isEqualTo(logSplit); + } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeTieringFactory.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeTieringFactory.java index 3a9c785788..e966fb2d4f 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeTieringFactory.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeTieringFactory.java @@ -35,7 +35,7 @@ public class TestingValuesLakeTieringFactory @Override public LakeWriter createLakeWriter( WriterInitContext writerInitContext) throws IOException { - return new TestingValuesLakeWriter(writerInitContext.tablePath().toString()); + return new TestingValuesLakeWriter(writerInitContext.tableInfo()); } @Override diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeWriter.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeWriter.java index 480ebe3077..2764ba0f7c 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeWriter.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeWriter.java @@ -18,29 +18,46 @@ package org.apache.fluss.lake.values.tiering; +import org.apache.fluss.config.ConfigOption; import org.apache.fluss.lake.serializer.SimpleVersionedSerializer; import org.apache.fluss.lake.values.TestingValuesLake; import org.apache.fluss.lake.writer.LakeWriter; +import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.record.LogRecord; import org.apache.fluss.utils.InstantiationUtils; import java.io.IOException; import java.io.Serializable; +import java.time.Duration; import java.util.UUID; +import static org.apache.fluss.config.ConfigBuilder.key; + /** Implementation of {@link LakeWriter} for values lake. */ public class TestingValuesLakeWriter implements LakeWriter { private final String tableId; private final String writerId; + static ConfigOption writePauseOption = + key("write-pause").durationType().noDefaultValue(); + private final Duration writePause; - public TestingValuesLakeWriter(String tableId) { - this.tableId = tableId; + public TestingValuesLakeWriter(TableInfo tableInfo) { + this.tableId = tableInfo.getTablePath().toString(); this.writerId = UUID.randomUUID().toString(); + this.writePause = tableInfo.getCustomProperties().get(writePauseOption); } @Override public void write(LogRecord record) throws IOException { + try { + if (writePause != null) { + Thread.sleep(writePause.toMillis()); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while pausing before write", e); + } TestingValuesLake.writeRecord(tableId, writerId, record); } diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index db9d614354..a9cba3f476 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -540,6 +540,12 @@ message LakeTieringHeartbeatRequest { repeated PbHeartbeatReqForTable failed_tables = 3; // whether to request a table optional bool request_table = 4; + // Table IDs that were force finished due to reaching the maximum tiering duration. + // When a table's tiering operation exceeds the max duration (data lake freshness), + // it will be force finished to prevent it from blocking other tables' tiering operations. + // These table IDs must also appear in finished_tables. The coordinator uses this field to + // distinguish between normally finished tables and force finished tables. + repeated int64 force_finished_tables = 5; } message LakeTieringHeartbeatResponse { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index 94d56dd338..05bc28e18a 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -150,9 +150,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.function.Supplier; @@ -743,6 +745,12 @@ public CompletableFuture lakeTieringHeartbeat( } } + // process force finished tables + Set forceFinishedTableId = new HashSet<>(); + for (long forceFinishTableId : request.getForceFinishedTables()) { + forceFinishedTableId.add(forceFinishTableId); + } + // process finished tables for (PbHeartbeatReqForTable finishTable : request.getFinishedTablesList()) { PbHeartbeatRespForTable pbHeartbeatRespForTable = @@ -750,7 +758,9 @@ public CompletableFuture lakeTieringHeartbeat( try { validateHeartbeatRequest(finishTable, currentCoordinatorEpoch); lakeTableTieringManager.finishTableTiering( - finishTable.getTableId(), finishTable.getTieringEpoch()); + finishTable.getTableId(), + finishTable.getTieringEpoch(), + forceFinishedTableId.contains(finishTable.getTableId())); } catch (Throwable e) { pbHeartbeatRespForTable.setError(ApiError.fromThrowable(e).toErrorResponse()); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java index c25c779691..a5267e0b7d 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java @@ -83,19 +83,22 @@ * ▼ ▼ * ┌──────────┐ (lake freshness > tiering interval) * │Scheduled ├──────┐ - * └──────────┘ ▼ - * ▲ ┌───────┐ (assign to tier service) ┌───────┐ - * | |Pending├──────────────────────────►|Tiering├─┐ - * | └───────┘ └───┬───┘ │ - * | ▲ ┌─────────────────┘ │ - * | | | (timeout or failure) | (finished) - * | | ▼ ▼ - * | | (retry) ┌─────────┐ ┌────────┐ - * | └────────────│ Failed │ │ Tiered │ - * | └─────────┘ └────┬───┘ - * | | - * └──────────────────────────────────────────────────────┘ - * (ready for next round of tiering) + * └─────▲────┘ ▼ + * │ ┌───────┐ (assign to tier service) ┌───────┐ + * │ |Pending├──────────────────────────►|Tiering├─┐ + * │ └───▲───┘ └───┬───┘ │ + * │ │ ┌─────────────────┘ │ + * │ │ | (timeout or failure) | (finished) + * │ │ ▼ ▼ + * │ │ (retry) ┌─────────┐ ┌────────┐ + * │ │ │ Failed │ │ Tiered │ + * │ │ └─────────┘ └───┬────┘ + * │ │ │ + * │ │ (force finished) │ (ready for next round) + * │ └───────────────────────────────────────-┘ + * │ │ + * └───────────────────────────────────────────────────┘ + * (ready for next round of tiering) * } */ public class LakeTableTieringManager implements AutoCloseable { @@ -330,15 +333,20 @@ public LakeTieringTableInfo requestTable() { }); } - public void finishTableTiering(long tableId, long tieredEpoch) { + public void finishTableTiering(long tableId, long tieredEpoch, boolean isForceFinished) { inLock( lock, () -> { validateTieringServiceRequest(tableId, tieredEpoch); // to tiered state firstly doHandleStateChange(tableId, TieringState.Tiered); - // then to scheduled state to enable other tiering service can pick it - doHandleStateChange(tableId, TieringState.Scheduled); + if (isForceFinished) { + // add to pending again since it's forced to finish + doHandleStateChange(tableId, TieringState.Pending); + } else { + // then to scheduled state to enable other tiering service can pick it + doHandleStateChange(tableId, TieringState.Scheduled); + } }); } @@ -420,7 +428,17 @@ private void validateTieringServiceRequest(long tableId, long tieringEpoch) { *

Tiering -> Failed * *

-- When the tiering service timeout to report heartbeat or report failure for the table, - * do: transmit to Tiered state + * do: transmit to Failed state + * + *

Tiered -> Pending + * + *

-- When the tiering is force finished due to exceeding the specified tiering duration, do: + * transmit to Pending state to enable immediate re-tiering + * + *

Tiered -> Scheduled + * + *

-- When the tiering is normally finished, do: transmit to Scheduled state to wait for the + * next round of tiering */ private void doHandleStateChange(long tableId, TieringState targetState) { TieringState currentState = tieringStates.get(tableId); @@ -570,7 +588,7 @@ public Set validPreviousStates() { Pending { @Override public Set validPreviousStates() { - return EnumSet.of(Scheduled, Failed); + return EnumSet.of(Scheduled, Failed, Tiered); } }, // When one tiering service is tiering the table, the state will be Tiering diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableTieringManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableTieringManagerTest.java index fb0b75b18a..80b589dc06 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableTieringManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableTieringManagerTest.java @@ -131,7 +131,7 @@ void testRemoveLakeTable() { assertThatThrownBy(() -> tableTieringManager.reportTieringFail(tableId1, 1)) .isInstanceOf(TableNotExistException.class) .hasMessage("The table %d doesn't exist.", tableId1); - assertThatThrownBy(() -> tableTieringManager.finishTableTiering(tableId1, 1)) + assertThatThrownBy(() -> tableTieringManager.finishTableTiering(tableId1, 1, false)) .isInstanceOf(TableNotExistException.class) .hasMessage("The table %d doesn't exist.", tableId1); } @@ -152,7 +152,7 @@ void testFinishTableTieringReTriggerSchedule() { assertThat(tableTieringManager.requestTable()).isNull(); // mock lake tiering finish one-round tiering - tableTieringManager.finishTableTiering(tableId1, tieredEpoch); + tableTieringManager.finishTableTiering(tableId1, tieredEpoch, false); // not advance time, request table should return null assertThat(tableTieringManager.requestTable()).isNull(); @@ -211,12 +211,12 @@ void testTieringServiceTimeOutReTriggerPending() { .hasMessage( "The tiering epoch %d is not match current epoch %d in coordinator for table %d.", 1, 2, tableId1); - assertThatThrownBy(() -> tableTieringManager.finishTableTiering(tableId1, 1)) + assertThatThrownBy(() -> tableTieringManager.finishTableTiering(tableId1, 1, false)) .isInstanceOf(FencedTieringEpochException.class) .hasMessage( "The tiering epoch %d is not match current epoch %d in coordinator for table %d.", 1, 2, tableId1); - assertThatThrownBy(() -> tableTieringManager.finishTableTiering(tableId1, 3)) + assertThatThrownBy(() -> tableTieringManager.finishTableTiering(tableId1, 3, false)) .isInstanceOf(FencedTieringEpochException.class) .hasMessage( "The tiering epoch %d is not match current epoch %d in coordinator for table %d.", @@ -238,6 +238,29 @@ void testTieringFail() { assertRequestTable(tableId1, tablePath1, 2); } + @Test + void testForceFinishTableTieringImmediatelyRePending() { + long tableId1 = 1L; + TablePath tablePath1 = TablePath.of("db", "table1"); + TableInfo tableInfo1 = createTableInfo(tableId1, tablePath1, Duration.ofSeconds(10)); + tableTieringManager.addNewLakeTable(tableInfo1); + manualClock.advanceTime(Duration.ofSeconds(10)); + // check requested table + assertRequestTable(tableId1, tablePath1, 1); + + // request table should return null since the table is tiering + assertThat(tableTieringManager.requestTable()).isNull(); + + // mock lake tiering force finish (e.g., due to exceeding tiering duration) + tableTieringManager.finishTableTiering(tableId1, 1, true); + // should immediately be re-pending and can be requested again without waiting + assertRequestTable(tableId1, tablePath1, 2); + + // verify it can be requested again immediately after force finish + // request table should return null since the table is tiering + assertThat(tableTieringManager.requestTable()).isNull(); + } + private TableInfo createTableInfo(long tableId, TablePath tablePath, Duration freshness) { TableDescriptor tableDescriptor = TableDescriptor.builder() diff --git a/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java b/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java index aa030d0256..d21c7174ce 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java @@ -17,23 +17,42 @@ package org.apache.fluss.server.lakehouse; +import org.apache.fluss.config.ConfigOption; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.TableAlreadyExistException; import org.apache.fluss.exception.TableNotExistException; +import org.apache.fluss.lake.committer.CommittedLakeSnapshot; +import org.apache.fluss.lake.committer.CommitterInitContext; +import org.apache.fluss.lake.committer.LakeCommitter; import org.apache.fluss.lake.lakestorage.LakeCatalog; import org.apache.fluss.lake.lakestorage.LakeStorage; import org.apache.fluss.lake.lakestorage.LakeStoragePlugin; +import org.apache.fluss.lake.serializer.SimpleVersionedSerializer; import org.apache.fluss.lake.source.LakeSource; import org.apache.fluss.lake.writer.LakeTieringFactory; +import org.apache.fluss.lake.writer.LakeWriter; +import org.apache.fluss.lake.writer.WriterInitContext; import org.apache.fluss.metadata.DataLakeFormat; import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.record.LogRecord; +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.fluss.config.ConfigBuilder.key; + /** A plugin of paimon just for testing purpose. */ public class TestingPaimonStoragePlugin implements LakeStoragePlugin { @@ -54,7 +73,7 @@ public static class TestingPaimonLakeStorage implements LakeStorage { @Override public LakeTieringFactory createLakeTieringFactory() { - throw new UnsupportedOperationException("createLakeTieringFactory is not supported."); + return new TestingPaimonTieringFactory(); } @Override @@ -96,4 +115,154 @@ public TableDescriptor getTable(TablePath tablePath) { return tableByPath.get(tablePath); } } + + private static class TestingPaimonTieringFactory + implements LakeTieringFactory { + + @Override + public LakeWriter createLakeWriter( + WriterInitContext writerInitContext) { + return new TestingPaimonWriter(writerInitContext.tableInfo()); + } + + @Override + public SimpleVersionedSerializer getWriteResultSerializer() { + return new TestingPaimonWriteResultSerializer(); + } + + @Override + public LakeCommitter createLakeCommitter( + CommitterInitContext committerInitContext) throws IOException { + return new TestingPaimonCommitter(); + } + + @Override + public SimpleVersionedSerializer getCommittableSerializer() { + return new SimpleVersionedSerializer() { + @Override + public int getVersion() { + return 0; + } + + @Override + public byte[] serialize(TestPaimonCommittable obj) throws IOException { + return new byte[0]; + } + + @Override + public TestPaimonCommittable deserialize(int version, byte[] serialized) + throws IOException { + return new TestPaimonCommittable(); + } + }; + } + } + + private static class TestingPaimonWriter implements LakeWriter { + + static ConfigOption writePauseOption = + key("write-pause").durationType().noDefaultValue(); + + private int writtenRecords = 0; + private final Duration writePause; + + private TestingPaimonWriter(TableInfo tableInfo) { + this.writePause = tableInfo.getCustomProperties().get(writePauseOption); + } + + @Override + public void write(LogRecord record) throws IOException { + try { + if (writePause != null) { + Thread.sleep(writePause.toMillis()); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while pausing before write", e); + } + writtenRecords += 1; + } + + @Override + public TestingPaimonWriteResult complete() throws IOException { + return new TestingPaimonWriteResult(writtenRecords); + } + + @Override + public void close() throws IOException { + // do nothing + } + } + + private static class TestingPaimonWriteResult { + private final int writtenRecords; + + public TestingPaimonWriteResult(int writtenRecords) { + this.writtenRecords = writtenRecords; + } + } + + private static class TestingPaimonWriteResultSerializer + implements SimpleVersionedSerializer { + + @Override + public int getVersion() { + return 1; + } + + @Override + public byte[] serialize(TestingPaimonWriteResult result) throws IOException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos)) { + oos.writeInt(result.writtenRecords); + return baos.toByteArray(); + } + } + + @Override + public TestingPaimonWriteResult deserialize(int version, byte[] serialized) + throws IOException { + try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); + ObjectInputStream ois = new ObjectInputStream(bais)) { + return new TestingPaimonWriteResult(ois.readInt()); + } + } + } + + private static class TestPaimonCommittable {} + + private static class TestingPaimonCommitter + implements LakeCommitter { + + @Override + public TestPaimonCommittable toCommittable( + List testingPaimonWriteResults) throws IOException { + return new TestPaimonCommittable(); + } + + @Override + public long commit( + TestPaimonCommittable committable, Map snapshotProperties) + throws IOException { + // do nothing, and always return 1 as committed snapshot + return 1; + } + + @Override + public void abort(TestPaimonCommittable committable) throws IOException { + // do nothing + } + + @Nullable + @Override + public CommittedLakeSnapshot getMissingLakeSnapshot( + @Nullable Long latestLakeSnapshotIdOfFluss) throws IOException { + return null; + } + + @Override + public void close() throws Exception { + // do nothing + } + } } diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index 16e942860e..de785f7163 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -448,6 +448,7 @@ org.apache.fluss.flink.tiering.source.TieringWriterInitContext org.apache.fluss.flink.tiering.source.TieringSourceReader + org.apache.fluss.flink.tiering.source.TieringSourceFetcherManager org.apache.fluss.flink.tiering.source.TableBucketWriteResultEmitter