Skip to content

Commit 201d606

Browse files
committed
use freshness as max tiering duration and mark forced finished table as pending again
1 parent 7f250d1 commit 201d606

11 files changed

Lines changed: 232 additions & 81 deletions

File tree

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/LakeTieringJobBuilder.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,6 @@
1717

1818
package org.apache.fluss.flink.tiering;
1919

20-
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
21-
import org.apache.flink.configuration.PipelineOptions;
22-
import org.apache.flink.core.execution.JobClient;
23-
import org.apache.flink.streaming.api.datastream.DataStreamSource;
24-
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
25-
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
2620
import org.apache.fluss.config.Configuration;
2721
import org.apache.fluss.flink.tiering.committer.CommittableMessageTypeInfo;
2822
import org.apache.fluss.flink.tiering.committer.TieringCommitOperatorFactory;
@@ -33,6 +27,13 @@
3327
import org.apache.fluss.lake.lakestorage.LakeStoragePluginSetUp;
3428
import org.apache.fluss.lake.writer.LakeTieringFactory;
3529

30+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
31+
import org.apache.flink.configuration.PipelineOptions;
32+
import org.apache.flink.core.execution.JobClient;
33+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
34+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
35+
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
36+
3637
import static org.apache.fluss.flink.tiering.source.TieringSource.TIERING_SOURCE_TRANSFORMATION_UID;
3738
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL;
3839
import static org.apache.fluss.utils.Preconditions.checkNotNull;

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,6 @@
1717

1818
package org.apache.fluss.flink.tiering.source;
1919

20-
import org.apache.flink.api.connector.source.Boundedness;
21-
import org.apache.flink.api.connector.source.Source;
22-
import org.apache.flink.api.connector.source.SourceReader;
23-
import org.apache.flink.api.connector.source.SourceReaderContext;
24-
import org.apache.flink.api.connector.source.SplitEnumerator;
25-
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
26-
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
27-
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
28-
import org.apache.flink.core.io.SimpleVersionedSerializer;
29-
import org.apache.flink.runtime.jobgraph.OperatorID;
30-
import org.apache.flink.streaming.api.graph.StreamGraphHasherV2;
3120
import org.apache.fluss.client.Connection;
3221
import org.apache.fluss.client.ConnectionFactory;
3322
import org.apache.fluss.config.Configuration;
@@ -41,6 +30,18 @@
4130
import org.apache.fluss.shaded.guava32.com.google.common.hash.Hasher;
4231
import org.apache.fluss.shaded.guava32.com.google.common.hash.Hashing;
4332

33+
import org.apache.flink.api.connector.source.Boundedness;
34+
import org.apache.flink.api.connector.source.Source;
35+
import org.apache.flink.api.connector.source.SourceReader;
36+
import org.apache.flink.api.connector.source.SourceReaderContext;
37+
import org.apache.flink.api.connector.source.SplitEnumerator;
38+
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
39+
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
40+
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
41+
import org.apache.flink.core.io.SimpleVersionedSerializer;
42+
import org.apache.flink.runtime.jobgraph.OperatorID;
43+
import org.apache.flink.streaming.api.graph.StreamGraphHasherV2;
44+
4445
import java.nio.charset.StandardCharsets;
4546

4647
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL;

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,15 @@
1818

1919
package org.apache.fluss.flink.tiering.source;
2020

21+
import org.apache.fluss.flink.adapter.SingleThreadFetcherManagerAdapter;
22+
import org.apache.fluss.flink.tiering.source.split.TieringSplit;
23+
2124
import org.apache.flink.configuration.Configuration;
2225
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
2326
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;
2427
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherTask;
2528
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
2629
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
27-
import org.apache.fluss.flink.adapter.SingleThreadFetcherManagerAdapter;
28-
import org.apache.fluss.flink.tiering.source.split.TieringSplit;
2930

3031
import java.util.Collection;
3132
import java.util.function.Consumer;

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,6 @@
1717

1818
package org.apache.fluss.flink.tiering.source;
1919

20-
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
21-
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
22-
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
23-
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
2420
import org.apache.fluss.annotation.VisibleForTesting;
2521
import org.apache.fluss.client.Connection;
2622
import org.apache.fluss.client.table.Table;
@@ -38,10 +34,16 @@
3834
import org.apache.fluss.metadata.TableInfo;
3935
import org.apache.fluss.metadata.TablePath;
4036
import org.apache.fluss.utils.CloseableIterator;
37+
38+
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
39+
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
40+
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
41+
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
4142
import org.slf4j.Logger;
4243
import org.slf4j.LoggerFactory;
4344

4445
import javax.annotation.Nullable;
46+
4547
import java.io.IOException;
4648
import java.time.Duration;
4749
import java.util.ArrayDeque;

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java

Lines changed: 54 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,6 @@
1717

1818
package org.apache.fluss.flink.tiering.source.enumerator;
1919

20-
import org.apache.flink.api.connector.source.ReaderInfo;
21-
import org.apache.flink.api.connector.source.SourceEvent;
22-
import org.apache.flink.api.connector.source.SplitEnumerator;
23-
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
24-
import org.apache.flink.api.java.tuple.Tuple3;
25-
import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
26-
import org.apache.flink.util.FlinkRuntimeException;
2720
import org.apache.fluss.annotation.VisibleForTesting;
2821
import org.apache.fluss.client.Connection;
2922
import org.apache.fluss.client.ConnectionFactory;
@@ -48,10 +41,19 @@
4841
import org.apache.fluss.rpc.messages.PbLakeTieringTableInfo;
4942
import org.apache.fluss.rpc.metrics.ClientMetricGroup;
5043
import org.apache.fluss.utils.MapUtils;
44+
45+
import org.apache.flink.api.connector.source.ReaderInfo;
46+
import org.apache.flink.api.connector.source.SourceEvent;
47+
import org.apache.flink.api.connector.source.SplitEnumerator;
48+
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
49+
import org.apache.flink.api.java.tuple.Tuple3;
50+
import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
51+
import org.apache.flink.util.FlinkRuntimeException;
5152
import org.slf4j.Logger;
5253
import org.slf4j.LoggerFactory;
5354

5455
import javax.annotation.Nullable;
56+
5557
import java.io.IOException;
5658
import java.util.ArrayList;
5759
import java.util.Collections;
@@ -102,7 +104,7 @@ public class TieringSourceEnumerator
102104

103105
private final Map<Long, Long> tieringTableEpochs;
104106
private final Map<Long, Long> failedTableEpochs;
105-
private final Map<Long, Long> finishedTableEpochs;
107+
private final Map<Long, TieringFinishState> finishedTables;
106108
private final Set<Long> tieringReachMaxDurationsTables;
107109

108110
// lazily instantiated
@@ -131,7 +133,7 @@ public TieringSourceEnumerator(
131133
this.pendingSplits = Collections.synchronizedList(new ArrayList<>());
132134
this.readersAwaitingSplit = Collections.synchronizedSet(new TreeSet<>());
133135
this.tieringTableEpochs = MapUtils.newConcurrentHashMap();
134-
this.finishedTableEpochs = MapUtils.newConcurrentHashMap();
136+
this.finishedTables = MapUtils.newConcurrentHashMap();
135137
this.failedTableEpochs = MapUtils.newConcurrentHashMap();
136138
this.tieringReachMaxDurationsTables = Collections.synchronizedSet(new TreeSet<>());
137139
}
@@ -252,7 +254,9 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
252254
"The finished table {} is not in tiering table, won't report it to Fluss to mark as finished.",
253255
finishedTableId);
254256
} else {
255-
finishedTableEpochs.put(finishedTableId, tieringEpoch);
257+
boolean isForceComplete = tieringReachMaxDurationsTables.remove(finishedTableId);
258+
finishedTables.put(
259+
finishedTableId, TieringFinishState.from(tieringEpoch, isForceComplete));
256260
}
257261
}
258262

@@ -274,7 +278,7 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
274278
}
275279
}
276280

277-
if (!finishedTableEpochs.isEmpty() || !failedTableEpochs.isEmpty()) {
281+
if (!finishedTables.isEmpty() || !failedTableEpochs.isEmpty()) {
278282
// call one round of heartbeat to notify table has been finished or failed
279283
this.context.callAsync(
280284
this::requestTieringTableSplitsViaHeartBeat, this::generateAndAssignSplits);
@@ -288,6 +292,7 @@ private void handleSourceReaderFailOver() {
288292
// we need to make all as failed
289293
failedTableEpochs.putAll(new HashMap<>(tieringTableEpochs));
290294
tieringTableEpochs.clear();
295+
tieringReachMaxDurationsTables.clear();
291296
// also clean all pending splits since we mark all as failed
292297
pendingSplits.clear();
293298
if (!failedTableEpochs.isEmpty()) {
@@ -362,13 +367,13 @@ private void assignSplits() {
362367
if (closed) {
363368
return null;
364369
}
365-
Map<Long, Long> currentFinishedTableEpochs = new HashMap<>(this.finishedTableEpochs);
370+
Map<Long, TieringFinishState> currentFinishedTables = new HashMap<>(this.finishedTables);
366371
Map<Long, Long> currentFailedTableEpochs = new HashMap<>(this.failedTableEpochs);
367372
LakeTieringHeartbeatRequest tieringHeartbeatRequest =
368373
tieringTableHeartBeat(
369374
basicHeartBeat(),
370375
this.tieringTableEpochs,
371-
currentFinishedTableEpochs,
376+
currentFinishedTables,
372377
currentFailedTableEpochs,
373378
this.flussCoordinatorEpoch);
374379

@@ -397,9 +402,9 @@ private void assignSplits() {
397402
waitHeartbeatResponse(coordinatorGateway.lakeTieringHeartbeat(tieringHeartbeatRequest));
398403
}
399404

400-
// if come to here, we can remove currentFinishedTableEpochs/failedTableEpochs to avoid send
405+
// if come to here, we can remove currentFinishedTables/failedTableEpochs to avoid send
401406
// in next round
402-
currentFinishedTableEpochs.forEach(finishedTableEpochs::remove);
407+
currentFinishedTables.forEach(finishedTables::remove);
403408
currentFailedTableEpochs.forEach(failedTableEpochs::remove);
404409
return lakeTieringInfo;
405410
}
@@ -428,7 +433,7 @@ private void generateTieringSplits(Tuple3<Long, Long, TablePath> tieringTable)
428433
LOG.info(
429434
"Generate Tiering splits for table {} is empty, no need to tier data.",
430435
tieringTable.f2.getTableName());
431-
finishedTableEpochs.put(tieringTable.f0, tieringTable.f1);
436+
finishedTables.put(tieringTable.f0, TieringFinishState.from(tieringTable.f1));
432437
} else {
433438
tieringTableEpochs.put(tieringTable.f0, tieringTable.f1);
434439
pendingSplits.addAll(tieringSplits);
@@ -537,16 +542,28 @@ static LakeTieringHeartbeatRequest heartBeatWithRequestNewTieringTable(
537542
static LakeTieringHeartbeatRequest tieringTableHeartBeat(
538543
LakeTieringHeartbeatRequest heartbeatRequest,
539544
Map<Long, Long> tieringTableEpochs,
540-
Map<Long, Long> finishedTableEpochs,
545+
Map<Long, TieringFinishState> finishedTables,
541546
Map<Long, Long> failedTableEpochs,
542547
int coordinatorEpoch) {
543548
if (!tieringTableEpochs.isEmpty()) {
544549
heartbeatRequest.addAllTieringTables(
545550
toPbHeartbeatReqForTable(tieringTableEpochs, coordinatorEpoch));
546551
}
547-
if (!finishedTableEpochs.isEmpty()) {
552+
if (!finishedTables.isEmpty()) {
553+
Map<Long, Long> finishTieringEpochs = new HashMap<>();
554+
Set<Long> forceFinishedTables = new HashSet<>();
555+
finishedTables.forEach(
556+
(tableId, tieringFinishState) -> {
557+
finishTieringEpochs.put(tableId, tieringFinishState.tieringEpoch);
558+
if (tieringFinishState.isForceToFinish) {
559+
forceFinishedTables.add(tableId);
560+
}
561+
});
548562
heartbeatRequest.addAllFinishedTables(
549-
toPbHeartbeatReqForTable(finishedTableEpochs, coordinatorEpoch));
563+
toPbHeartbeatReqForTable(finishTieringEpochs, coordinatorEpoch));
564+
for (long forceFinishedTableId : forceFinishedTables) {
565+
heartbeatRequest.addForceFinishedTable(forceFinishedTableId);
566+
}
550567
}
551568
// add failed tiering table to heart beat request
552569
return failedTableHeartBeat(heartbeatRequest, failedTableEpochs, coordinatorEpoch);
@@ -590,4 +607,22 @@ static LakeTieringHeartbeatResponse waitHeartbeatResponse(
590607
}
591608
}
592609
}
610+
611+
private static class TieringFinishState {
612+
long tieringEpoch;
613+
boolean isForceToFinish;
614+
615+
public static TieringFinishState from(long tieringEpoch) {
616+
return new TieringFinishState(tieringEpoch, false);
617+
}
618+
619+
public static TieringFinishState from(long tieringEpoch, boolean isForceToFinish) {
620+
return new TieringFinishState(tieringEpoch, isForceToFinish);
621+
}
622+
623+
private TieringFinishState(long tieringEpoch, boolean isForceToFinish) {
624+
this.tieringEpoch = tieringEpoch;
625+
this.isForceToFinish = isForceToFinish;
626+
}
627+
}
593628
}

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringITCase.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import java.util.Map;
4545
import java.util.Optional;
4646

47-
import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_MAX_DURATION;
4847
import static org.apache.fluss.testutils.common.CommonTestUtils.waitValue;
4948
import static org.assertj.core.api.Assertions.assertThat;
5049

@@ -96,7 +95,6 @@ void testTieringReachMaxDuration() throws Exception {
9695

9796
// set tiering duration to a small value for testing purpose
9897
Configuration lakeTieringConfig = new Configuration();
99-
lakeTieringConfig.set(LAKE_TIERING_TABLE_MAX_DURATION, Duration.ofSeconds(1));
10098
JobClient jobClient = buildTieringJob(execEnv, lakeTieringConfig);
10199

102100
try {

0 commit comments

Comments
 (0)