Skip to content

Commit 53431d9

Browse files
[flink] Add Flink filter pushdown integration and documentation
1 parent 3d8df9f commit 53431d9

17 files changed

Lines changed: 1540 additions & 46 deletions

File tree

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherFilterITCase.java

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -243,22 +243,46 @@ void testFilterCompletelyRejectsNonMatchingBatch() throws Exception {
243243
allNonMatchingData, FILTER_TEST_ROW_TYPE, 0L, DEFAULT_SCHEMA_ID);
244244
addRecordsToBucket(tb0, nonMatchingRecords);
245245

246+
// Send fetch and wait for the response. The server should filter out the
247+
// entire batch (max=5, filter is a>5) and return a filteredEndOffset.
246248
logFetcher.sendFetches();
247249

248-
retry(
249-
Duration.ofMinutes(1),
250-
() -> {
251-
// The fetch may complete even if all batches are filtered out
252-
// depending on implementation
253-
assertThat(logFetcher.hasAvailableFetches()).isTrue();
254-
});
250+
retry(Duration.ofMinutes(1), () -> assertThat(logFetcher.hasAvailableFetches()).isTrue());
255251

256252
ScanRecords records = logFetcher.collectFetch();
257253

258-
// For a batch where max value = 5 and filter is a > 5, the entire batch should be
259-
// filtered out at the server side. collectFetch() may omit the bucket entirely because
260-
// there are no user-visible records to return.
254+
// No user-visible records should be returned
261255
assertThat(records.records(tb0)).isEmpty();
256+
257+
// If the server hasn't advanced HW yet, the offset may still be 0 on the
258+
// first fetch. Use a retry loop with send-wait-collect to ensure the
259+
// filteredEndOffset propagates through the full RPC round-trip.
260+
if (logFetcher.getLogScannerStatus().getBucketOffset(tb0) == 0L) {
261+
retry(
262+
Duration.ofMinutes(1),
263+
() -> {
264+
logFetcher.sendFetches();
265+
retry(
266+
Duration.ofSeconds(30),
267+
() -> assertThat(logFetcher.hasAvailableFetches()).isTrue());
268+
logFetcher.collectFetch();
269+
270+
Long tb0Offset = logFetcher.getLogScannerStatus().getBucketOffset(tb0);
271+
assertThat(tb0Offset)
272+
.as(
273+
"Fetch offset must advance past filtered data, "
274+
+ "confirming filteredEndOffset survived RPC round-trip")
275+
.isGreaterThan(0L);
276+
});
277+
} else {
278+
// Offset already advanced on first fetch — filteredEndOffset RPC round-trip confirmed
279+
assertThat(logFetcher.getLogScannerStatus().getBucketOffset(tb0))
280+
.as("Fetch offset must advance past filtered data")
281+
.isGreaterThan(0L);
282+
}
283+
284+
// After offset advancement, fetcher should be clean
285+
assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(0);
262286
}
263287

264288
@Test
@@ -284,9 +308,8 @@ void testConsecutiveFetchesWithFilteredOffsetAdvancement() throws Exception {
284308
LogRecordBatchStatisticsTestUtils.createLogRecordsWithStatistics(
285309
MATCHING_DATA, FILTER_TEST_ROW_TYPE, 0L, DEFAULT_SCHEMA_ID));
286310

287-
// Use retry loop to handle the case where bucket 0's HW hasn't advanced yet
288-
// on the first fetch cycle. The filtered-empty response (with filteredEndOffset)
289-
// is only returned when the server has data up to HW to scan.
311+
// Flatten retry: send-wait-collect in a single loop until bucket 0's offset advances.
312+
// This avoids the fragile nested retry pattern that could accumulate in-flight requests.
290313
retry(
291314
Duration.ofMinutes(1),
292315
() -> {

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_FORMAT;
6363
import static org.apache.fluss.config.ConfigOptions.TABLE_DELETE_BEHAVIOR;
6464
import static org.apache.fluss.config.FlussConfigUtils.CLIENT_PREFIX;
65+
import static org.apache.fluss.config.FlussConfigUtils.TABLE_PREFIX;
6566
import static org.apache.fluss.flink.catalog.FlinkCatalog.LAKE_TABLE_SPLITTER;
6667
import static org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils.getBucketKeyIndexes;
6768
import static org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils.getBucketKeys;
@@ -150,6 +151,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
150151
toFlussTablePath(context.getObjectIdentifier()),
151152
toFlussClientConfig(
152153
context.getCatalogTable().getOptions(), context.getConfiguration()),
154+
toFlussTableConfig(tableOptions),
153155
tableOutputType,
154156
primaryKeyIndexes,
155157
bucketKeyIndexes,
@@ -273,6 +275,24 @@ private static Configuration toFlussClientConfig(
273275
return flussConfig;
274276
}
275277

278+
private static Configuration toFlussTableConfig(ReadableConfig tableOptions) {
279+
Configuration tableConfig = new Configuration();
280+
281+
// forward all table-level configs by iterating through known table options
282+
// this approach is safer than using toMap() which may not exist in all Flink versions
283+
for (ConfigOption<?> option : FlinkConnectorOptions.TABLE_OPTIONS) {
284+
if (option.key().startsWith(TABLE_PREFIX)) {
285+
Object value = tableOptions.getOptional(option).orElse(null);
286+
if (value != null) {
287+
// convert value to string for configuration storage
288+
tableConfig.setString(option.key(), value.toString());
289+
}
290+
}
291+
}
292+
293+
return tableConfig;
294+
}
295+
276296
private static TablePath toFlussTablePath(ObjectIdentifier tablePath) {
277297
return TablePath.of(tablePath.getDatabaseName(), tablePath.getObjectName());
278298
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
126126
isPartitioned,
127127
flussRowType,
128128
projectedFields,
129+
null,
129130
offsetsInitializer,
130131
scanPartitionDiscoveryIntervalMs,
131132
new BinlogDeserializationSchema(),

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
163163
isPartitioned(),
164164
flussRowType,
165165
projectedFields,
166+
null,
166167
offsetsInitializer,
167168
scanPartitionDiscoveryIntervalMs,
168169
new ChangelogDeserializationSchema(),

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,13 +73,16 @@ public class FlinkSource<OUT>
7373
@Nullable private final LakeSource<LakeSplit> lakeSource;
7474
private final LeaseContext leaseContext;
7575

76+
@Nullable private final Predicate logRecordBatchFilter;
77+
7678
public FlinkSource(
7779
Configuration flussConf,
7880
TablePath tablePath,
7981
boolean hasPrimaryKey,
8082
boolean isPartitioned,
8183
RowType sourceOutputType,
8284
@Nullable int[] projectedFields,
85+
@Nullable Predicate logRecordBatchFilter,
8386
OffsetsInitializer offsetsInitializer,
8487
long scanPartitionDiscoveryIntervalMs,
8588
FlussDeserializationSchema<OUT> deserializationSchema,
@@ -93,6 +96,7 @@ public FlinkSource(
9396
isPartitioned,
9497
sourceOutputType,
9598
projectedFields,
99+
logRecordBatchFilter,
96100
offsetsInitializer,
97101
scanPartitionDiscoveryIntervalMs,
98102
deserializationSchema,
@@ -109,6 +113,7 @@ public FlinkSource(
109113
boolean isPartitioned,
110114
RowType sourceOutputType,
111115
@Nullable int[] projectedFields,
116+
@Nullable Predicate logRecordBatchFilter,
112117
OffsetsInitializer offsetsInitializer,
113118
long scanPartitionDiscoveryIntervalMs,
114119
FlussDeserializationSchema<OUT> deserializationSchema,
@@ -122,6 +127,7 @@ public FlinkSource(
122127
this.isPartitioned = isPartitioned;
123128
this.sourceOutputType = sourceOutputType;
124129
this.projectedFields = projectedFields;
130+
this.logRecordBatchFilter = logRecordBatchFilter;
125131
this.offsetsInitializer = offsetsInitializer;
126132
this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs;
127133
this.deserializationSchema = deserializationSchema;
@@ -213,6 +219,7 @@ public SourceReader<OUT, SourceSplitBase> createReader(SourceReaderContext conte
213219
sourceOutputType,
214220
context,
215221
projectedFields,
222+
logRecordBatchFilter,
216223
flinkSourceReaderMetrics,
217224
recordEmitter,
218225
lakeSource);

0 commit comments

Comments
 (0)