diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherFilterITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherFilterITCase.java index 915bfa0339..94142edf99 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherFilterITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherFilterITCase.java @@ -243,22 +243,46 @@ void testFilterCompletelyRejectsNonMatchingBatch() throws Exception { allNonMatchingData, FILTER_TEST_ROW_TYPE, 0L, DEFAULT_SCHEMA_ID); addRecordsToBucket(tb0, nonMatchingRecords); + // Send fetch and wait for the response. The server should filter out the + // entire batch (max=5, filter is a>5) and return a filteredEndOffset. logFetcher.sendFetches(); - retry( - Duration.ofMinutes(1), - () -> { - // The fetch may complete even if all batches are filtered out - // depending on implementation - assertThat(logFetcher.hasAvailableFetches()).isTrue(); - }); + retry(Duration.ofMinutes(1), () -> assertThat(logFetcher.hasAvailableFetches()).isTrue()); ScanRecords records = logFetcher.collectFetch(); - // For a batch where max value = 5 and filter is a > 5, the entire batch should be - // filtered out at the server side. collectFetch() may omit the bucket entirely because - // there are no user-visible records to return. + // No user-visible records should be returned assertThat(records.records(tb0)).isEmpty(); + + // If the server hasn't advanced HW yet, the offset may still be 0 on the + // first fetch. Use a retry loop with send-wait-collect to ensure the + // filteredEndOffset propagates through the full RPC round-trip. + if (logFetcher.getLogScannerStatus().getBucketOffset(tb0) == 0L) { + retry( + Duration.ofMinutes(1), + () -> { + logFetcher.sendFetches(); + retry( + Duration.ofSeconds(30), + () -> assertThat(logFetcher.hasAvailableFetches()).isTrue()); + logFetcher.collectFetch(); + + Long tb0Offset = logFetcher.getLogScannerStatus().getBucketOffset(tb0); + assertThat(tb0Offset) + .as( + "Fetch offset must advance past filtered data, " + + "confirming filteredEndOffset survived RPC round-trip") + .isGreaterThan(0L); + }); + } else { + // Offset already advanced on first fetch — filteredEndOffset RPC round-trip confirmed + assertThat(logFetcher.getLogScannerStatus().getBucketOffset(tb0)) + .as("Fetch offset must advance past filtered data") + .isGreaterThan(0L); + } + + // After offset advancement, fetcher should be clean + assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(0); } @Test @@ -284,9 +308,8 @@ void testConsecutiveFetchesWithFilteredOffsetAdvancement() throws Exception { LogRecordBatchStatisticsTestUtils.createLogRecordsWithStatistics( MATCHING_DATA, FILTER_TEST_ROW_TYPE, 0L, DEFAULT_SCHEMA_ID)); - // Use retry loop to handle the case where bucket 0's HW hasn't advanced yet - // on the first fetch cycle. The filtered-empty response (with filteredEndOffset) - // is only returned when the server has data up to HW to scan. + // Flatten retry: send-wait-collect in a single loop until bucket 0's offset advances. + // This avoids the fragile nested retry pattern that could accumulate in-flight requests. retry( Duration.ofMinutes(1), () -> { diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java index 77f7909f24..855e748776 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java @@ -62,6 +62,7 @@ import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_FORMAT; import static org.apache.fluss.config.ConfigOptions.TABLE_DELETE_BEHAVIOR; import static org.apache.fluss.config.FlussConfigUtils.CLIENT_PREFIX; +import static org.apache.fluss.config.FlussConfigUtils.TABLE_PREFIX; import static org.apache.fluss.flink.catalog.FlinkCatalog.LAKE_TABLE_SPLITTER; import static org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils.getBucketKeyIndexes; import static org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils.getBucketKeys; @@ -150,6 +151,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { toFlussTablePath(context.getObjectIdentifier()), toFlussClientConfig( context.getCatalogTable().getOptions(), context.getConfiguration()), + toFlussTableConfig(tableOptions), tableOutputType, primaryKeyIndexes, bucketKeyIndexes, @@ -273,6 +275,24 @@ private static Configuration toFlussClientConfig( return flussConfig; } + private static Configuration toFlussTableConfig(ReadableConfig tableOptions) { + Configuration tableConfig = new Configuration(); + + // forward all table-level configs by iterating through known table options + // this approach is safer than using toMap() which may not exist in all Flink versions + for (ConfigOption option : FlinkConnectorOptions.TABLE_OPTIONS) { + if (option.key().startsWith(TABLE_PREFIX)) { + Object value = tableOptions.getOptional(option).orElse(null); + if (value != null) { + // convert value to string for configuration storage + tableConfig.setString(option.key(), value.toString()); + } + } + } + + return tableConfig; + } + private static TablePath toFlussTablePath(ObjectIdentifier tablePath) { return TablePath.of(tablePath.getDatabaseName(), tablePath.getObjectName()); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java index 65083c3bef..901995a657 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/BinlogFlinkTableSource.java @@ -126,6 +126,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { isPartitioned, flussRowType, projectedFields, + null, offsetsInitializer, scanPartitionDiscoveryIntervalMs, new BinlogDeserializationSchema(), diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java index 0b6b4979b3..f58fac3df9 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/ChangelogFlinkTableSource.java @@ -163,6 +163,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { isPartitioned(), flussRowType, projectedFields, + null, offsetsInitializer, scanPartitionDiscoveryIntervalMs, new ChangelogDeserializationSchema(), diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java index 0aeb0d59cb..efa426e5f9 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java @@ -73,6 +73,8 @@ public class FlinkSource @Nullable private final LakeSource lakeSource; private final LeaseContext leaseContext; + @Nullable private final Predicate logRecordBatchFilter; + public FlinkSource( Configuration flussConf, TablePath tablePath, @@ -80,6 +82,7 @@ public FlinkSource( boolean isPartitioned, RowType sourceOutputType, @Nullable int[] projectedFields, + @Nullable Predicate logRecordBatchFilter, OffsetsInitializer offsetsInitializer, long scanPartitionDiscoveryIntervalMs, FlussDeserializationSchema deserializationSchema, @@ -93,6 +96,7 @@ public FlinkSource( isPartitioned, sourceOutputType, projectedFields, + logRecordBatchFilter, offsetsInitializer, scanPartitionDiscoveryIntervalMs, deserializationSchema, @@ -109,6 +113,7 @@ public FlinkSource( boolean isPartitioned, RowType sourceOutputType, @Nullable int[] projectedFields, + @Nullable Predicate logRecordBatchFilter, OffsetsInitializer offsetsInitializer, long scanPartitionDiscoveryIntervalMs, FlussDeserializationSchema deserializationSchema, @@ -122,6 +127,7 @@ public FlinkSource( this.isPartitioned = isPartitioned; this.sourceOutputType = sourceOutputType; this.projectedFields = projectedFields; + this.logRecordBatchFilter = logRecordBatchFilter; this.offsetsInitializer = offsetsInitializer; this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs; this.deserializationSchema = deserializationSchema; @@ -213,6 +219,7 @@ public SourceReader createReader(SourceReaderContext conte sourceOutputType, context, projectedFields, + logRecordBatchFilter, flinkSourceReaderMetrics, recordEmitter, lakeSource); diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java index 83b4dd29ed..a6301a31d6 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java @@ -28,6 +28,7 @@ import org.apache.fluss.flink.source.reader.LeaseContext; import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils; import org.apache.fluss.flink.utils.FlinkConversions; +import org.apache.fluss.flink.utils.PredicateConverter; import org.apache.fluss.flink.utils.PushdownUtils; import org.apache.fluss.flink.utils.PushdownUtils.FieldEqual; import org.apache.fluss.lake.source.LakeSource; @@ -36,10 +37,12 @@ import org.apache.fluss.metadata.DeleteBehavior; import org.apache.fluss.metadata.MergeEngineType; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.predicate.CompoundPredicate; import org.apache.fluss.predicate.PartitionPredicateVisitor; import org.apache.fluss.predicate.Predicate; import org.apache.fluss.predicate.PredicateBuilder; import org.apache.fluss.predicate.PredicateVisitor; +import org.apache.fluss.types.DataTypeChecks; import org.apache.fluss.types.RowType; import org.apache.flink.annotation.VisibleForTesting; @@ -89,6 +92,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; import static org.apache.fluss.flink.utils.LakeSourceUtils.createLakeSource; import static org.apache.fluss.flink.utils.PredicateConverter.convertToFlussPredicate; @@ -133,6 +138,12 @@ public class FlinkTableSource @Nullable private final MergeEngineType mergeEngineType; + // table-level configuration + private final Configuration tableConfig; + + // pre-computed available statistics columns + private final Set availableStatsColumns; + // output type after projection pushdown private LogicalType producedDataType; @@ -154,10 +165,12 @@ public class FlinkTableSource private final Map tableOptions; @Nullable private LakeSource lakeSource; + @Nullable private Predicate logRecordBatchFilter; public FlinkTableSource( TablePath tablePath, Configuration flussConfig, + Configuration tableConfig, org.apache.flink.table.types.logical.RowType tableOutputType, int[] primaryKeyIndexes, int[] bucketKeyIndexes, @@ -197,6 +210,11 @@ public FlinkTableSource( createLakeSource(tablePath, tableOptions), "LakeSource must not be null if enable datalake"); } + this.tableConfig = checkNotNull(tableConfig, "tableConfig must not be null"); + + // Pre-compute available statistics columns to avoid repeated calculation + RowType flussRowType = FlinkConversions.toFlussRowType(tableOutputType); + this.availableStatsColumns = computeAvailableStatsColumns(flussRowType); } @Override @@ -340,6 +358,7 @@ public boolean isBounded() { isPartitioned(), flussRowType, projectedFields, + logRecordBatchFilter, offsetsInitializer, scanPartitionDiscoveryIntervalMs, new RowDataDeserializationSchema(), @@ -424,6 +443,7 @@ public DynamicTableSource copy() { new FlinkTableSource( tablePath, flussConfig, + tableConfig, tableOutputType, primaryKeyIndexes, bucketKeyIndexes, @@ -444,6 +464,8 @@ public DynamicTableSource copy() { source.modificationScanType = modificationScanType; source.partitionFilters = partitionFilters; source.lakeSource = lakeSource; + source.logRecordBatchFilter = logRecordBatchFilter; + // Note: availableStatsColumns is already computed in the constructor return source; } @@ -497,16 +519,12 @@ && hasPrimaryKey() lookupRow.setField(keyRowProjection[fieldEqual.fieldIndex], fieldEqual.equalValue); visitedPkFields.add(fieldEqual.fieldIndex); } - // if not all primary key fields are in condition, meaning can not push down single row - // filter, determine whether to push down partition filter later + // if not all primary key fields are in condition, fall through to + // try partition filter pushdown for partitioned PK tables if (visitedPkFields.equals(primaryKeyTypes.keySet())) { singleRowFilter = lookupRow; - // FLINK-38635 We cannot determine whether this source will ultimately be used as a - // scan - // source or a lookup source. Since fluss lookup sources cannot accept filters yet, - // to - // be safe, we return all filters to the Flink planner. + // FLINK-38635: return all filters as remaining for scan vs lookup safety return Result.of(acceptedFilters, filters); } } @@ -542,6 +560,7 @@ && hasPrimaryKey() } } partitionFilters = converted.isEmpty() ? null : PredicateBuilder.and(converted); + // lake source is not null if (lakeSource != null) { List lakePredicates = new ArrayList<>(); @@ -562,14 +581,156 @@ && hasPrimaryKey() } } } + } + + if (acceptedFilters.isEmpty() && remainingFilters.isEmpty()) { + remainingFilters.addAll(filters); + } + + if (!hasPrimaryKey()) { + Result recordBatchResult = pushdownRecordBatchFilter(remainingFilters); + acceptedFilters.addAll(recordBatchResult.getAcceptedFilters()); + } + // FLINK-38635 We cannot determine whether this source will ultimately be used as a + // scan source or a lookup source. If used as a lookup source, the accepted filters + // (partition filters, record batch filters) are not enforced in the lookup path. + // Always return all original filters as remaining so Flink applies them as a safety net. + return Result.of(acceptedFilters, filters); + } + + private Result pushdownRecordBatchFilter(List filters) { + // Use pre-computed available statistics columns + LOG.trace("Statistics available columns: {}", availableStatsColumns); + + // Convert to fluss row type for predicate operations + RowType flussRowType = FlinkConversions.toFlussRowType(tableOutputType); + + List pushdownPredicates = new ArrayList<>(); + List acceptedFilters = new ArrayList<>(); + List remainingFilters = new ArrayList<>(); + + for (ResolvedExpression filter : filters) { + Optional predicateOpt = + PredicateConverter.convertToFlussPredicate(tableOutputType, filter); + + if (predicateOpt.isPresent()) { + Predicate predicate = predicateOpt.get(); + LOG.trace("Converted filter to predicate: {}", predicate); + // Check if predicate can benefit from statistics + if (canPredicateUseStatistics(predicate, flussRowType, availableStatsColumns)) { + pushdownPredicates.add(predicate); + acceptedFilters.add(filter); + } + } + // All filters are kept as remaining so that Flink can still verify the results + // after server-side filtering (safety net). + remainingFilters.add(filter); + } - // FLINK-38635 We cannot determine whether this source will ultimately be used as a scan - // source or a lookup source. Since fluss lookup sources cannot accept filters yet, to - // be safe, we return all filters to the Flink planner. - return Result.of(acceptedFilters, filters); + if (!pushdownPredicates.isEmpty()) { + Predicate merged = + pushdownPredicates.size() == 1 + ? pushdownPredicates.get(0) + : PredicateBuilder.and(pushdownPredicates); + LOG.info("Accept merged predicate for record batch filter: {}", merged); + this.logRecordBatchFilter = merged; + } else { + this.logRecordBatchFilter = null; + } + return Result.of(acceptedFilters, remainingFilters); + } + + /** + * Checks if a predicate can benefit from statistics based on the available statistics columns. + * + * @param predicate the predicate to check + * @param rowType the row type + * @param availableStatsColumns the columns that have statistics available + * @return true if the predicate can use statistics + */ + private boolean canPredicateUseStatistics( + Predicate predicate, RowType rowType, Set availableStatsColumns) { + + class StatisticsUsageVisitor implements PredicateVisitor { + @Override + public Boolean visit(org.apache.fluss.predicate.LeafPredicate leaf) { + // Check if the field referenced by this predicate has statistics available + String fieldName = rowType.getFieldNames().get(leaf.index()); + // Check if statistics are available for this column + return availableStatsColumns.contains(fieldName); + } + + @Override + public Boolean visit(CompoundPredicate compound) { + // For compound predicates, all children must be able to use statistics + for (Predicate child : compound.children()) { + if (!child.visit(this)) { + return false; + } + } + return true; + } } - return Result.of(Collections.emptyList(), filters); + return predicate.visit(new StatisticsUsageVisitor()); + } + + /** + * Computes the available statistics columns based on table configuration. This method is called + * once during construction to pre-compute the result. + * + * @param flussRowType the row type + * @return set of column names that have statistics available + */ + private Set computeAvailableStatsColumns(RowType flussRowType) { + Set columns = new HashSet<>(); + + // Get the configured statistics columns + String columnsConfig = tableConfig.get(ConfigOptions.TABLE_STATISTICS_COLUMNS); + + // Check if statistics are enabled for the table + if (columnsConfig == null || columnsConfig.isEmpty()) { + LOG.debug("Statistics collection is disabled for the table"); + return columns; + } + + if ("*".equals(columnsConfig)) { + // Collect all columns with supported statistics types + for (int i = 0; i < flussRowType.getFieldCount(); i++) { + org.apache.fluss.types.DataType fieldType = flussRowType.getTypeAt(i); + if (DataTypeChecks.isSupportedStatisticsType(fieldType)) { + columns.add(flussRowType.getFieldNames().get(i)); + } + } + } else { + // Use user-specified columns (validate they exist and have supported types) + List configuredColumns = + Arrays.stream(columnsConfig.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); + + for (String columnName : configuredColumns) { + // Find the column in the row type + int columnIndex = flussRowType.getFieldNames().indexOf(columnName); + if (columnIndex >= 0) { + org.apache.fluss.types.DataType fieldType = flussRowType.getTypeAt(columnIndex); + if (DataTypeChecks.isSupportedStatisticsType(fieldType)) { + columns.add(columnName); + } else { + LOG.trace( + "Configured statistics column '{}' has unsupported type and will be ignored", + columnName); + } + } else { + LOG.trace( + "Configured statistics column '{}' does not exist in table schema", + columnName); + } + } + } + + return columns; } @Override @@ -652,4 +813,27 @@ public int[] getPrimaryKeyIndexes() { public int[] getBucketKeyIndexes() { return bucketKeyIndexes; } + + @VisibleForTesting + public int[] getPartitionKeyIndexes() { + return partitionKeyIndexes; + } + + @VisibleForTesting + @Nullable + public Predicate getLogRecordBatchFilter() { + return logRecordBatchFilter; + } + + @VisibleForTesting + @Nullable + public GenericRowData getSingleRowFilter() { + return singleRowFilter; + } + + @VisibleForTesting + @Nullable + public Predicate getPartitionFilters() { + return partitionFilters; + } } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java index d00a74a26b..53cabcca79 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSource.java @@ -23,6 +23,7 @@ import org.apache.fluss.flink.source.deserializer.FlussDeserializationSchema; import org.apache.fluss.flink.source.reader.LeaseContext; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.predicate.Predicate; import org.apache.fluss.types.RowType; import javax.annotation.Nullable; @@ -65,6 +66,7 @@ public class FlussSource extends FlinkSource { boolean isPartitioned, RowType sourceOutputType, @Nullable int[] projectedFields, + @Nullable Predicate logRecordBatchFilter, OffsetsInitializer offsetsInitializer, long scanPartitionDiscoveryIntervalMs, FlussDeserializationSchema deserializationSchema, @@ -77,6 +79,7 @@ public class FlussSource extends FlinkSource { isPartitioned, sourceOutputType, projectedFields, + logRecordBatchFilter, offsetsInitializer, scanPartitionDiscoveryIntervalMs, deserializationSchema, diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java index afd955c01f..90f460eb27 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlussSourceBuilder.java @@ -27,6 +27,7 @@ import org.apache.fluss.flink.source.deserializer.FlussDeserializationSchema; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.predicate.Predicate; import org.apache.fluss.types.RowType; import org.slf4j.Logger; @@ -68,6 +69,7 @@ public class FlussSourceBuilder { private int[] projectedFields; private String[] projectedFieldNames; + private Predicate logRecordBatchFilter; private Long scanPartitionDiscoveryIntervalMs; private OffsetsInitializer offsetsInitializer; private FlussDeserializationSchema deserializationSchema; @@ -174,6 +176,18 @@ public FlussSourceBuilder setProjectedFields(String... projectedFieldNames) return this; } + /** + * Sets the predicate used for server-side record batch filtering based on column statistics. + * + * @param logRecordBatchFilter the predicate to filter record batches + * @return this builder + */ + public FlussSourceBuilder setLogRecordBatchFilter(Predicate logRecordBatchFilter) { + checkNotNull(logRecordBatchFilter, "logRecordBatchFilter must not be null"); + this.logRecordBatchFilter = logRecordBatchFilter; + return this; + } + /** * Sets custom Fluss configuration properties for the source connector. * @@ -297,6 +311,7 @@ public FlussSource build() { isPartitioned, sourceOutputType, projectedFields, + logRecordBatchFilter, offsetsInitializer, scanPartitionDiscoveryIntervalMs, deserializationSchema, diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceReader.java index 9a6c4e8dad..68a3489bd8 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceReader.java @@ -35,6 +35,7 @@ import org.apache.fluss.lake.source.LakeSplit; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.predicate.Predicate; import org.apache.fluss.types.RowType; import org.apache.flink.api.connector.source.SourceEvent; @@ -69,6 +70,7 @@ public FlinkSourceReader( RowType sourceOutputType, SourceReaderContext context, @Nullable int[] projectedFields, + @Nullable Predicate logRecordBatchFilter, FlinkSourceReaderMetrics flinkSourceReaderMetrics, FlinkRecordEmitter recordEmitter, LakeSource lakeSource) { @@ -82,8 +84,9 @@ public FlinkSourceReader( tablePath, sourceOutputType, projectedFields, - flinkSourceReaderMetrics, - lakeSource), + logRecordBatchFilter, + lakeSource, + flinkSourceReaderMetrics), (ignore) -> {}), recordEmitter, context.getConfiguration(), diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java index bd5305a363..145423c9dc 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java @@ -21,6 +21,7 @@ import org.apache.fluss.client.Connection; import org.apache.fluss.client.ConnectionFactory; import org.apache.fluss.client.table.Table; +import org.apache.fluss.client.table.scanner.Scan; import org.apache.fluss.client.table.scanner.ScanRecord; import org.apache.fluss.client.table.scanner.batch.BatchScanner; import org.apache.fluss.client.table.scanner.log.LogScanner; @@ -39,6 +40,7 @@ import org.apache.fluss.lake.source.LakeSplit; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.predicate.Predicate; import org.apache.fluss.types.RowType; import org.apache.fluss.utils.CloseableIterator; import org.apache.fluss.utils.ExceptionUtils; @@ -91,6 +93,7 @@ public class FlinkSourceSplitReader implements SplitReader subscribedBuckets; @Nullable private final int[] projectedFields; + private final FlinkSourceReaderMetrics flinkSourceReaderMetrics; @Nullable private BoundedSplitReader currentBoundedSplitReader; @@ -120,8 +123,9 @@ public FlinkSourceSplitReader( TablePath tablePath, RowType sourceOutputType, @Nullable int[] projectedFields, - FlinkSourceReaderMetrics flinkSourceReaderMetrics, - @Nullable LakeSource lakeSource) { + @Nullable Predicate logRecordBatchFilter, + @Nullable LakeSource lakeSource, + FlinkSourceReaderMetrics flinkSourceReaderMetrics) { this.flinkMetricRegistry = new FlinkMetricRegistry(flinkSourceReaderMetrics.getSourceReaderMetricGroup()); this.connection = ConnectionFactory.createConnection(flussConf, flinkMetricRegistry); @@ -134,7 +138,11 @@ public FlinkSourceSplitReader( this.flinkSourceReaderMetrics = flinkSourceReaderMetrics; sanityCheck(table.getTableInfo().getRowType(), projectedFields); - this.logScanner = table.newScan().project(projectedFields).createLogScanner(); + Scan tableScan = table.newScan().project(projectedFields); + if (logRecordBatchFilter != null) { + tableScan = tableScan.filter(logRecordBatchFilter); + } + this.logScanner = tableScan.createLogScanner(); this.stoppingOffsets = new HashMap<>(); this.emptyLogSplits = new HashSet<>(); this.lakeSource = lakeSource; diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PredicateConverter.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PredicateConverter.java index 861381c146..7c691cea97 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PredicateConverter.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PredicateConverter.java @@ -248,12 +248,12 @@ private Object extractLiteral(DataType expectedType, Expression expression) { throw new UnsupportedExpression(); } - private static Object fromFlinkObject(Object o, DataType type) { - if (o == null) { + private static Object fromFlinkObject(Object flinkValue, DataType type) { + if (flinkValue == null) { return null; } return InternalRow.createFieldGetter(FlinkConversions.toFlussType(type), 0) - .getFieldOrNull((new FlinkAsFlussRow()).replace(GenericRowData.of(o))); + .getFieldOrNull((new FlinkAsFlussRow()).replace(GenericRowData.of(flinkValue))); } private boolean supportsPredicate(LogicalType type) { diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFilterPushDownTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFilterPushDownTest.java new file mode 100644 index 0000000000..25cce44906 --- /dev/null +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFilterPushDownTest.java @@ -0,0 +1,1063 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.flink.FlinkConnectorOptions; +import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.predicate.Predicate; +import org.apache.fluss.row.BinaryString; +import org.apache.fluss.row.GenericRow; + +import org.apache.flink.shaded.guava31.com.google.common.collect.Maps; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.ValueLiteralExpression; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.types.logical.RowType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Comprehensive unit tests for filter pushdown functionality in FlinkTableSource. Tests cover + * various scenarios including streaming/batch modes, different table types, and different filter + * pushdown types (primary key, partition key, record batch filter). + */ +public class FlinkTableSourceFilterPushDownTest { + + @Nested + class NonPartitionedKvTableTests { + private FlinkTableSource tableSource; + + @BeforeEach + void setUp() { + // Create a non-partitioned KV table schema: id (PK), name, value + RowType tableOutputType = + (RowType) + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT()), + DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("value", DataTypes.BIGINT())) + .getLogicalType(); + + TablePath tablePath = TablePath.of("test_db", "test_kv_table"); + Configuration flussConfig = new Configuration(); + flussConfig.setString(FlinkConnectorOptions.BOOTSTRAP_SERVERS.key(), "localhost:9092"); + + FlinkConnectorOptionsUtils.StartupOptions startupOptions = + new FlinkConnectorOptionsUtils.StartupOptions(); + startupOptions.startupMode = FlinkConnectorOptions.ScanStartupMode.EARLIEST; + + // Create table config for testing + Configuration tableConfig = new Configuration(); + + tableConfig.set(ConfigOptions.TABLE_STATISTICS_COLUMNS, "*"); + + tableSource = + new FlinkTableSource( + tablePath, + flussConfig, + tableConfig, + tableOutputType, + new int[] {0}, // primary key indexes (id) + new int[] {}, // bucket key indexes + new int[] {}, // partition key indexes + true, // streaming + startupOptions, + false, // lookup async + false, // insert if not exists + null, // cache + 1000L, // scan partition discovery interval + false, // is data lake enabled + null, // merge engine type + Maps.newHashMap(), + null); // lease context + } + + @Test + void testStreamingModeNonPrimaryKeyPushdown() { + // Test non-primary key filter in streaming mode for KV table + // KV tables should not support record batch filter pushdown for non-primary key fields + FieldReferenceExpression fieldRef = + new FieldReferenceExpression("name", DataTypes.STRING(), 0, 1); + ValueLiteralExpression literal = new ValueLiteralExpression("test"); + CallExpression equalCall = + new CallExpression( + BuiltInFunctionDefinitions.EQUALS, + Arrays.asList(fieldRef, literal), + DataTypes.BOOLEAN()); + + List filters = Arrays.asList(equalCall); + + FlinkTableSource.Result result = tableSource.applyFilters(filters); + + // For KV tables, non-primary key filters should not be pushed down + assertThat(result.getAcceptedFilters()).isEmpty(); + assertThat(result.getRemainingFilters()).hasSize(1); + assertThat(tableSource.getLogRecordBatchFilter()).isNull(); + } + + @Test + void testStreamingModeMultipleFilters() { + // Test multiple filters in streaming mode for KV table + // KV tables should not support record batch filter pushdown + FieldReferenceExpression idFieldRef = + new FieldReferenceExpression("id", DataTypes.INT(), 0, 0); + FieldReferenceExpression nameFieldRef = + new FieldReferenceExpression("name", DataTypes.STRING(), 0, 1); + ValueLiteralExpression idLiteral = new ValueLiteralExpression(5); + ValueLiteralExpression nameLiteral = new ValueLiteralExpression("test"); + + CallExpression idEqualCall = + new CallExpression( + BuiltInFunctionDefinitions.EQUALS, + Arrays.asList(idFieldRef, idLiteral), + DataTypes.BOOLEAN()); + + CallExpression nameEqualCall = + new CallExpression( + BuiltInFunctionDefinitions.EQUALS, + Arrays.asList(nameFieldRef, nameLiteral), + DataTypes.BOOLEAN()); + + List filters = Arrays.asList(idEqualCall, nameEqualCall); + + FlinkTableSource.Result result = tableSource.applyFilters(filters); + + // For KV tables in streaming mode, no filters should be pushed down + assertThat(result.getAcceptedFilters()).isEmpty(); + assertThat(result.getRemainingFilters()).hasSize(2); + assertThat(tableSource.getLogRecordBatchFilter()).isNull(); + } + } + + @Nested + class NonPartitionedLogTableTests { + private FlinkTableSource tableSource; + + @BeforeEach + void setUp() { + // Create a non-partitioned log table schema: id, name, value (no primary key) + RowType tableOutputType = + (RowType) + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT()), + DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("value", DataTypes.BIGINT()), + DataTypes.FIELD("region", DataTypes.STRING())) + .getLogicalType(); + + TablePath tablePath = TablePath.of("test_db", "test_log_table"); + Configuration flussConfig = new Configuration(); + flussConfig.setString(FlinkConnectorOptions.BOOTSTRAP_SERVERS.key(), "localhost:9092"); + + // Create table config for testing + Configuration tableConfig = new Configuration(); + + tableConfig.set(ConfigOptions.TABLE_STATISTICS_COLUMNS, "*"); + + FlinkConnectorOptionsUtils.StartupOptions startupOptions = + new FlinkConnectorOptionsUtils.StartupOptions(); + startupOptions.startupMode = FlinkConnectorOptions.ScanStartupMode.EARLIEST; + + tableSource = + new FlinkTableSource( + tablePath, + flussConfig, + tableConfig, + tableOutputType, + new int[] {}, // no primary key indexes + new int[] {}, // bucket key indexes + new int[] {}, // partition key indexes + true, // streaming + startupOptions, + false, // lookup async + false, // insert if not exists + null, // cache + 1000L, // scan partition discovery interval + false, // is data lake enabled + null, // merge engine type + Maps.newHashMap(), + null); // lease context + } + + @Test + void testLogTableRecordBatchFilterPushdown() { + // Test record batch filter pushdown for log table + // Log tables should support record batch filter pushdown + FieldReferenceExpression valueFieldRef = + new FieldReferenceExpression("value", DataTypes.BIGINT(), 0, 2); + FieldReferenceExpression regionFieldRef = + new FieldReferenceExpression("region", DataTypes.STRING(), 0, 3); + + CallExpression regionEqualCall = + new CallExpression( + BuiltInFunctionDefinitions.EQUALS, + Arrays.asList(regionFieldRef, new ValueLiteralExpression("HangZhou")), + DataTypes.BOOLEAN()); + CallExpression valueRangeCall = + new CallExpression( + BuiltInFunctionDefinitions.GREATER_THAN, + Arrays.asList(valueFieldRef, new ValueLiteralExpression(1000L)), + DataTypes.BOOLEAN()); + + List filters = Arrays.asList(regionEqualCall, valueRangeCall); + + FlinkTableSource.Result result = tableSource.applyFilters(filters); + + assertThat(result.getAcceptedFilters()).hasSize(2); + assertThat(result.getRemainingFilters()).hasSize(2); + Predicate predicate = tableSource.getLogRecordBatchFilter(); + assertThat(predicate).isNotNull(); + assertThat(tableSource.getSingleRowFilter()).isNull(); + + // Verify the predicate evaluates correctly against statistics. + // Schema: id(INT), name(STRING), value(BIGINT), region(STRING) + // Filter: region = 'HangZhou' AND value > 1000 + int fieldCount = 4; + Long[] noNulls = new Long[] {0L, 0L, 0L, 0L}; + + // Batch with value range [2000, 5000] and region ['HangZhou', 'HangZhou'] + // → should match + assertThat( + predicate.test( + 100, + GenericRow.of( + null, null, 2000L, BinaryString.fromString("HangZhou")), + GenericRow.of( + null, null, 5000L, BinaryString.fromString("HangZhou")), + noNulls)) + .isTrue(); + + // Batch with value range [100, 500] → should NOT match (max=500, not > 1000) + assertThat( + predicate.test( + 100, + GenericRow.of( + null, null, 100L, BinaryString.fromString("HangZhou")), + GenericRow.of( + null, null, 500L, BinaryString.fromString("HangZhou")), + noNulls)) + .isFalse(); + + // Batch with region ['Beijing', 'Beijing'] → should NOT match + assertThat( + predicate.test( + 100, + GenericRow.of( + null, null, 2000L, BinaryString.fromString("Beijing")), + GenericRow.of( + null, null, 5000L, BinaryString.fromString("Beijing")), + noNulls)) + .isFalse(); + } + + @Test + void testLogTableRangeFilterPushdown() { + // Test range filter pushdown for log table + FieldReferenceExpression fieldRef = + new FieldReferenceExpression("id", DataTypes.INT(), 0, 0); + ValueLiteralExpression literal1 = new ValueLiteralExpression(3); + ValueLiteralExpression literal2 = new ValueLiteralExpression(10); + + CallExpression greaterThanCall = + new CallExpression( + BuiltInFunctionDefinitions.GREATER_THAN, + Arrays.asList(fieldRef, literal1), + DataTypes.BOOLEAN()); + + CallExpression lessThanCall = + new CallExpression( + BuiltInFunctionDefinitions.LESS_THAN, + Arrays.asList(fieldRef, literal2), + DataTypes.BOOLEAN()); + + List filters = Arrays.asList(greaterThanCall, lessThanCall); + + FlinkTableSource.Result result = tableSource.applyFilters(filters); + + // Range filters should be pushed down as record batch filter + assertThat(result.getAcceptedFilters()).hasSize(2); + assertThat(result.getRemainingFilters()).hasSize(2); + Predicate predicate = tableSource.getLogRecordBatchFilter(); + assertThat(predicate).isNotNull(); + + // Verify the predicate actually evaluates correctly against statistics. + // Schema: id(INT), name(STRING), value(BIGINT), region(STRING) + // Filter: id > 3 AND id < 10 + int fieldCount = 4; + Long[] noNulls = new Long[] {0L, 0L, 0L, 0L}; + + // Batch with id range [5, 8] → should match (5 > 3 is possible, 8 < 10 is possible) + assertThat( + predicate.test( + 100, + GenericRow.of(5, null, null, null), + GenericRow.of(8, null, null, null), + noNulls)) + .isTrue(); + + // Batch with id range [1, 2] → should NOT match (max=2, not > 3) + assertThat( + predicate.test( + 100, + GenericRow.of(1, null, null, null), + GenericRow.of(2, null, null, null), + noNulls)) + .isFalse(); + + // Batch with id range [11, 20] → should NOT match (min=11, not < 10) + assertThat( + predicate.test( + 100, + GenericRow.of(11, null, null, null), + GenericRow.of(20, null, null, null), + noNulls)) + .isFalse(); + } + + @Test + void testEmptyFilters() { + // Test with empty filters + List filters = Collections.emptyList(); + + FlinkTableSource.Result result = tableSource.applyFilters(filters); + + // No filters should be accepted or remain + assertThat(result.getAcceptedFilters()).isEmpty(); + assertThat(result.getRemainingFilters()).isEmpty(); + assertThat(tableSource.getLogRecordBatchFilter()).isNull(); + } + } + + @Nested + class PartitionedKvTableTests { + private FlinkTableSource tableSource; + + @BeforeEach + void setUp() { + // Create a partitioned KV table schema: id (PK), name, value, region (partition key) + RowType tableOutputType = + (RowType) + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT()), + DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("value", DataTypes.BIGINT()), + DataTypes.FIELD("region", DataTypes.STRING())) + .getLogicalType(); + + TablePath tablePath = TablePath.of("test_db", "test_partitioned_kv_table"); + Configuration flussConfig = new Configuration(); + flussConfig.setString(FlinkConnectorOptions.BOOTSTRAP_SERVERS.key(), "localhost:9092"); + + // Create table config for testing + Configuration tableConfig = new Configuration(); + + tableConfig.set(ConfigOptions.TABLE_STATISTICS_COLUMNS, "*"); + + FlinkConnectorOptionsUtils.StartupOptions startupOptions = + new FlinkConnectorOptionsUtils.StartupOptions(); + startupOptions.startupMode = FlinkConnectorOptions.ScanStartupMode.EARLIEST; + + tableSource = + new FlinkTableSource( + tablePath, + flussConfig, + tableConfig, + tableOutputType, + new int[] {0}, // primary key indexes (id) + new int[] {}, // bucket key indexes + new int[] {3}, // partition key indexes (region) + true, // streaming + startupOptions, + false, // lookup async + false, // insert if not exists + null, // cache + 1000L, // scan partition discovery interval + false, // is data lake enabled + null, // merge engine type + Maps.newHashMap(), + null); // lease context + } + + @Test + void testPartitionKeyPushdown() { + // Test partition key pushdown + FieldReferenceExpression fieldRef = + new FieldReferenceExpression("region", DataTypes.STRING(), 0, 3); + ValueLiteralExpression literal = + new ValueLiteralExpression("us-east", DataTypes.STRING().notNull()); + CallExpression equalCall = + new CallExpression( + BuiltInFunctionDefinitions.EQUALS, + Arrays.asList(fieldRef, literal), + DataTypes.BOOLEAN()); + + List filters = Arrays.asList(equalCall); + + FlinkTableSource.Result result = tableSource.applyFilters(filters); + + // Partition key filters should be pushed down as partition filters + assertThat(result.getAcceptedFilters()).hasSize(1); + // FLINK-38635: all filters remain for safety (scan vs lookup ambiguity) + assertThat(result.getRemainingFilters()).hasSize(1); + assertThat(tableSource.getPartitionFilters()).isNotNull(); + } + + @Test + void testRecordBatchFilterNotPushedDownInKvTable() { + // Test partition key pushdown + FieldReferenceExpression fieldRef = + new FieldReferenceExpression("name", DataTypes.STRING(), 0, 1); + ValueLiteralExpression literal = + new ValueLiteralExpression("bob", DataTypes.STRING().notNull()); + CallExpression equalCall = + new CallExpression( + BuiltInFunctionDefinitions.EQUALS, + Arrays.asList(fieldRef, literal), + DataTypes.BOOLEAN()); + + List filters = Arrays.asList(equalCall); + + FlinkTableSource.Result result = tableSource.applyFilters(filters); + + assertThat(result.getAcceptedFilters()).isEmpty(); + assertThat(result.getRemainingFilters()).hasSize(1); + assertThat(tableSource.getLogRecordBatchFilter()).isNull(); + } + } + + @Nested + class PartitionedLogTableTests { + private FlinkTableSource tableSource; + + @BeforeEach + void setUp() { + // Create a partitioned log table schema: id, name, value, region (partition key) + RowType tableOutputType = + (RowType) + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT()), + DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("value", DataTypes.BIGINT()), + DataTypes.FIELD("region", DataTypes.STRING()), + DataTypes.FIELD("attachment", DataTypes.BYTES())) + .getLogicalType(); + + TablePath tablePath = TablePath.of("test_db", "test_partitioned_log_table"); + Configuration flussConfig = new Configuration(); + flussConfig.setString(FlinkConnectorOptions.BOOTSTRAP_SERVERS.key(), "localhost:9092"); + + // Create table config for testing + Configuration tableConfig = new Configuration(); + + tableConfig.set(ConfigOptions.TABLE_STATISTICS_COLUMNS, "*"); + + FlinkConnectorOptionsUtils.StartupOptions startupOptions = + new FlinkConnectorOptionsUtils.StartupOptions(); + startupOptions.startupMode = FlinkConnectorOptions.ScanStartupMode.EARLIEST; + + tableSource = + new FlinkTableSource( + tablePath, + flussConfig, + tableConfig, + tableOutputType, + new int[] {}, // no primary key indexes + new int[] {}, // bucket key indexes + new int[] {3}, // partition key indexes (region) + true, // streaming + startupOptions, + false, // lookup async + false, // insert if not exists + null, // cache + 1000L, // scan partition discovery interval + false, // is data lake enabled + null, // merge engine type + Maps.newHashMap(), + null); // lease context + } + + @Test + void testPartitionedLogTablePartitionKeyPushdown() { + // Test partition key pushdown for partitioned log table + FieldReferenceExpression fieldRef = + new FieldReferenceExpression("region", DataTypes.STRING(), 0, 3); + ValueLiteralExpression literal = + new ValueLiteralExpression("us-east", DataTypes.STRING().notNull()); + CallExpression equalCall = + new CallExpression( + BuiltInFunctionDefinitions.EQUALS, + Arrays.asList(fieldRef, literal), + DataTypes.BOOLEAN()); + + List filters = Arrays.asList(equalCall); + + FlinkTableSource.Result result = tableSource.applyFilters(filters); + + // Partition key filters should be pushed down as partition filters + assertThat(result.getAcceptedFilters()).hasSize(1); + // FLINK-38635: all filters remain for safety (scan vs lookup ambiguity) + assertThat(result.getRemainingFilters()).hasSize(1); + assertThat(tableSource.getPartitionFilters()).isNotNull(); + } + + @Test + void testPartitionedLogTableRecordBatchFilterPushdown() { + // Test record batch filter pushdown for partitioned log table + FieldReferenceExpression fieldRef = + new FieldReferenceExpression("id", DataTypes.INT(), 0, 0); + ValueLiteralExpression literal = new ValueLiteralExpression(5); + CallExpression equalCall = + new CallExpression( + BuiltInFunctionDefinitions.EQUALS, + Arrays.asList(fieldRef, literal), + DataTypes.BOOLEAN()); + + List filters = Arrays.asList(equalCall); + + FlinkTableSource.Result result = tableSource.applyFilters(filters); + + assertThat(result.getAcceptedFilters()).hasSize(1); + assertThat(result.getRemainingFilters()).hasSize(1); + // record batch filter should be successfully pushdown + assertThat(tableSource.getLogRecordBatchFilter()).isNotNull(); + } + + @Test + void testCopyPreservesRecordBatchFilters() { + // Test that copying preserves all filter states + FieldReferenceExpression fieldRef = + new FieldReferenceExpression("id", DataTypes.INT(), 0, 0); + ValueLiteralExpression literal = new ValueLiteralExpression(5); + CallExpression equalCall = + new CallExpression( + BuiltInFunctionDefinitions.EQUALS, + Arrays.asList(fieldRef, literal), + DataTypes.BOOLEAN()); + + List filters = Arrays.asList(equalCall); + tableSource.applyFilters(filters); + + // Verify original has filters + Predicate originalFilter = tableSource.getLogRecordBatchFilter(); + assertThat(originalFilter).isNotNull(); + + // Copy the table source + FlinkTableSource copiedSource = (FlinkTableSource) tableSource.copy(); + + // Verify copied source has the same filters + Predicate copiedFilter = copiedSource.getLogRecordBatchFilter(); + assertThat(copiedFilter).isNotNull(); + assertThat(copiedFilter).isEqualTo(originalFilter); + } + + @Test + void testUnsupportedRecordBatchFilterColumnTypes() { + FieldReferenceExpression fieldRef = + new FieldReferenceExpression("attachment", DataTypes.BYTES(), 0, 4); + ValueLiteralExpression literal = + new ValueLiteralExpression(new byte[] {}, DataTypes.BYTES().notNull()); + CallExpression equalCall = + new CallExpression( + BuiltInFunctionDefinitions.EQUALS, + Arrays.asList(fieldRef, literal), + DataTypes.BOOLEAN()); + + List filters = Arrays.asList(equalCall); + + FlinkTableSource.Result result = tableSource.applyFilters(filters); + + // Unsupported column should not be pushed down as record batch filter + assertThat(result.getAcceptedFilters()).isEmpty(); + assertThat(result.getRemainingFilters()).hasSize(1); + assertThat(tableSource.getLogRecordBatchFilter()).isNull(); + } + + @Test + void testMixedFilterTypesPushdown() { + // Test mixed filter types: partition key + regular column + FieldReferenceExpression regionFieldRef = + new FieldReferenceExpression("region", DataTypes.STRING(), 0, 3); + FieldReferenceExpression idFieldRef = + new FieldReferenceExpression("id", DataTypes.INT(), 0, 0); + FieldReferenceExpression nameFieldRef = + new FieldReferenceExpression("name", DataTypes.STRING(), 0, 1); + ValueLiteralExpression regionLiteral = + new ValueLiteralExpression("us-east", DataTypes.STRING().notNull()); + ValueLiteralExpression idLiteral = new ValueLiteralExpression(5); + ValueLiteralExpression nameLiteral = + new ValueLiteralExpression("test", DataTypes.STRING().notNull()); + + CallExpression regionEqualCall = + new CallExpression( + BuiltInFunctionDefinitions.EQUALS, + Arrays.asList(regionFieldRef, regionLiteral), + DataTypes.BOOLEAN()); + + CallExpression idEqualCall = + new CallExpression( + BuiltInFunctionDefinitions.EQUALS, + Arrays.asList(idFieldRef, idLiteral), + DataTypes.BOOLEAN()); + + CallExpression nameEqualCall = + new CallExpression( + BuiltInFunctionDefinitions.EQUALS, + Arrays.asList(nameFieldRef, nameLiteral), + DataTypes.BOOLEAN()); + + List filters = + Arrays.asList(regionEqualCall, idEqualCall, nameEqualCall); + + FlinkTableSource.Result result = tableSource.applyFilters(filters); + + // Partition key should be pushed down as partition filter + // Regular columns should be pushed down as record batch filter + assertThat(result.getAcceptedFilters()).hasSize(3); + // FLINK-38635: all filters remain for safety (scan vs lookup ambiguity) + assertThat(result.getRemainingFilters()).hasSize(3); + assertThat(tableSource.getPartitionFilters()).isNotNull(); + assertThat(tableSource.getLogRecordBatchFilter()) + .isNotNull(); // Record batch filter pushed down for non-PK partitioned log + // table + } + } + + @Nested + class PartialStatisticsColumnTests { + private FlinkTableSource tableSource; + + @BeforeEach + void setUp() { + // Create a log table schema: id, name, value, region (no primary key) + RowType tableOutputType = + (RowType) + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT()), + DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("value", DataTypes.BIGINT()), + DataTypes.FIELD("region", DataTypes.STRING()), + DataTypes.FIELD("attachment", DataTypes.BYTES())) + .getLogicalType(); + + TablePath tablePath = TablePath.of("test_db", "test_partial_stats_table"); + Configuration flussConfig = new Configuration(); + flussConfig.setString(FlinkConnectorOptions.BOOTSTRAP_SERVERS.key(), "localhost:9092"); + + // Create table config with only partial columns for statistics + Configuration tableConfig = new Configuration(); + // Only enable statistics for 'id' and 'value' columns, not for 'name', 'region', + // 'attachment' + tableConfig.set(ConfigOptions.TABLE_STATISTICS_COLUMNS, "id,value"); + + FlinkConnectorOptionsUtils.StartupOptions startupOptions = + new FlinkConnectorOptionsUtils.StartupOptions(); + startupOptions.startupMode = FlinkConnectorOptions.ScanStartupMode.EARLIEST; + + tableSource = + new FlinkTableSource( + tablePath, + flussConfig, + tableConfig, + tableOutputType, + new int[] {}, // no primary key indexes + new int[] {}, // bucket key indexes + new int[] {}, // partition key indexes + true, // streaming + startupOptions, + false, // lookup async + false, // insert if not exists + null, // cache + 1000L, // scan partition discovery interval + false, // is data lake enabled + null, // merge engine type + Maps.newHashMap(), + null); // lease context + } + + @Test + void testFilterOnColumnWithStatistics() { + // Test filter on 'id' column which has statistics enabled + FieldReferenceExpression fieldRef = + new FieldReferenceExpression("id", DataTypes.INT(), 0, 0); + ValueLiteralExpression literal = new ValueLiteralExpression(5); + CallExpression equalCall = + new CallExpression( + BuiltInFunctionDefinitions.EQUALS, + Arrays.asList(fieldRef, literal), + DataTypes.BOOLEAN()); + + List filters = Arrays.asList(equalCall); + + FlinkTableSource.Result result = tableSource.applyFilters(filters); + + // Filter on 'id' should be pushed down as record batch filter since it has statistics + assertThat(result.getAcceptedFilters()).hasSize(1); + assertThat(result.getRemainingFilters()).hasSize(1); // Filter remains for execution + assertThat(tableSource.getLogRecordBatchFilter()).isNotNull(); + } + + @Test + void testFilterOnColumnWithoutStatistics() { + // Test filter on 'name' column which does not have statistics enabled + FieldReferenceExpression fieldRef = + new FieldReferenceExpression("name", DataTypes.STRING(), 0, 1); + ValueLiteralExpression literal = new ValueLiteralExpression("test"); + CallExpression equalCall = + new CallExpression( + BuiltInFunctionDefinitions.EQUALS, + Arrays.asList(fieldRef, literal), + DataTypes.BOOLEAN()); + + List filters = Arrays.asList(equalCall); + + FlinkTableSource.Result result = tableSource.applyFilters(filters); + + // Filter on 'name' should not be pushed down since it doesn't have statistics + assertThat(result.getAcceptedFilters()).isEmpty(); + assertThat(result.getRemainingFilters()).hasSize(1); + assertThat(tableSource.getLogRecordBatchFilter()).isNull(); + } + + @Test + void testFilterOnBinaryColumnExcludedFromStatistics() { + // Test filter on 'attachment' column which is binary type (excluded from statistics) + FieldReferenceExpression fieldRef = + new FieldReferenceExpression("attachment", DataTypes.BYTES(), 0, 4); + ValueLiteralExpression literal = + new ValueLiteralExpression(new byte[] {1, 2, 3}, DataTypes.BYTES().notNull()); + CallExpression equalCall = + new CallExpression( + BuiltInFunctionDefinitions.EQUALS, + Arrays.asList(fieldRef, literal), + DataTypes.BOOLEAN()); + + List filters = Arrays.asList(equalCall); + + FlinkTableSource.Result result = tableSource.applyFilters(filters); + + // Binary columns should not have pushdown even if configured + assertThat(result.getAcceptedFilters()).isEmpty(); + assertThat(result.getRemainingFilters()).hasSize(1); + assertThat(tableSource.getLogRecordBatchFilter()).isNull(); + } + + @Test + void testMixedFiltersWithPartialStatistics() { + // Test mixed filters: one on column with statistics, one without + FieldReferenceExpression idFieldRef = + new FieldReferenceExpression("id", DataTypes.INT(), 0, 0); + FieldReferenceExpression nameFieldRef = + new FieldReferenceExpression("name", DataTypes.STRING(), 0, 1); + FieldReferenceExpression valueFieldRef = + new FieldReferenceExpression("value", DataTypes.BIGINT(), 0, 2); + + ValueLiteralExpression idLiteral = new ValueLiteralExpression(5); + ValueLiteralExpression nameLiteral = new ValueLiteralExpression("test"); + ValueLiteralExpression valueLiteral = new ValueLiteralExpression(100L); + + CallExpression idEqualCall = + new CallExpression( + BuiltInFunctionDefinitions.EQUALS, + Arrays.asList(idFieldRef, idLiteral), + DataTypes.BOOLEAN()); + + CallExpression nameEqualCall = + new CallExpression( + BuiltInFunctionDefinitions.EQUALS, + Arrays.asList(nameFieldRef, nameLiteral), + DataTypes.BOOLEAN()); + + CallExpression valueGreaterCall = + new CallExpression( + BuiltInFunctionDefinitions.GREATER_THAN, + Arrays.asList(valueFieldRef, valueLiteral), + DataTypes.BOOLEAN()); + + List filters = + Arrays.asList(idEqualCall, nameEqualCall, valueGreaterCall); + + FlinkTableSource.Result result = tableSource.applyFilters(filters); + + // Only filters on 'id' and 'value' should be accepted for pushdown + assertThat(result.getAcceptedFilters()).hasSize(2); // id and value filters + assertThat(result.getRemainingFilters()).hasSize(3); // all filters remain for execution + assertThat(tableSource.getLogRecordBatchFilter()) + .isNotNull(); // Should have merged predicate for id and value + } + + @Test + void testRangeFilterOnColumnWithStatistics() { + // Test range filter on 'value' column which has statistics enabled + FieldReferenceExpression fieldRef = + new FieldReferenceExpression("value", DataTypes.BIGINT(), 0, 2); + ValueLiteralExpression lowerBound = new ValueLiteralExpression(10L); + ValueLiteralExpression upperBound = new ValueLiteralExpression(100L); + + CallExpression greaterThanCall = + new CallExpression( + BuiltInFunctionDefinitions.GREATER_THAN, + Arrays.asList(fieldRef, lowerBound), + DataTypes.BOOLEAN()); + + CallExpression lessThanCall = + new CallExpression( + BuiltInFunctionDefinitions.LESS_THAN, + Arrays.asList(fieldRef, upperBound), + DataTypes.BOOLEAN()); + + List filters = Arrays.asList(greaterThanCall, lessThanCall); + + FlinkTableSource.Result result = tableSource.applyFilters(filters); + + // Range filters on 'value' should be pushed down since it has statistics + assertThat(result.getAcceptedFilters()).hasSize(2); + assertThat(result.getRemainingFilters()).hasSize(2); // Filters remain for execution + assertThat(tableSource.getLogRecordBatchFilter()).isNotNull(); + } + + @Test + void testEmptyStatisticsConfiguration() { + // Test with empty statistics configuration + RowType tableOutputType = + (RowType) + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT()), + DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("value", DataTypes.BIGINT())) + .getLogicalType(); + + Configuration tableConfig = new Configuration(); + // No statistics columns configured + tableConfig.set(ConfigOptions.TABLE_STATISTICS_COLUMNS, ""); + + FlinkTableSource emptyStatsTableSource = + new FlinkTableSource( + TablePath.of("test_db", "test_empty_stats_table"), + new Configuration(), + tableConfig, + tableOutputType, + new int[] {}, // no primary key indexes + new int[] {}, // bucket key indexes + new int[] {}, // partition key indexes + true, // streaming + new FlinkConnectorOptionsUtils.StartupOptions(), + false, // lookup async + false, // insert if not exists + null, // cache + 1000L, // scan partition discovery interval + false, // is data lake enabled + null, // merge engine type + Maps.newHashMap(), + null); // lease context + + FieldReferenceExpression fieldRef = + new FieldReferenceExpression("id", DataTypes.INT(), 0, 0); + ValueLiteralExpression literal = new ValueLiteralExpression(5); + CallExpression equalCall = + new CallExpression( + BuiltInFunctionDefinitions.EQUALS, + Arrays.asList(fieldRef, literal), + DataTypes.BOOLEAN()); + + List filters = Arrays.asList(equalCall); + + FlinkTableSource.Result result = emptyStatsTableSource.applyFilters(filters); + + // No filters should be pushed down when statistics are disabled + assertThat(result.getAcceptedFilters()).isEmpty(); + assertThat(result.getRemainingFilters()).hasSize(1); + assertThat(emptyStatsTableSource.getLogRecordBatchFilter()).isNull(); + } + } + + @Nested + class BatchModePrimaryKeyPushdownTests { + private FlinkTableSource tableSource; + + @BeforeEach + void setUp() { + // Create a KV table schema for batch mode testing + RowType tableOutputType = + (RowType) + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT()), + DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("value", DataTypes.BIGINT())) + .getLogicalType(); + + TablePath tablePath = TablePath.of("test_db", "test_batch_table"); + Configuration flussConfig = new Configuration(); + flussConfig.setString(FlinkConnectorOptions.BOOTSTRAP_SERVERS.key(), "localhost:9092"); + + // Create table config for testing + Configuration tableConfig = new Configuration(); + + tableConfig.set(ConfigOptions.TABLE_STATISTICS_COLUMNS, "*"); + + FlinkConnectorOptionsUtils.StartupOptions startupOptions = + new FlinkConnectorOptionsUtils.StartupOptions(); + startupOptions.startupMode = FlinkConnectorOptions.ScanStartupMode.FULL; + + tableSource = + new FlinkTableSource( + tablePath, + flussConfig, + tableConfig, + tableOutputType, + new int[] {0}, // primary key indexes (id) + new int[] {}, // bucket key indexes + new int[] {}, // partition key indexes + false, // batch mode + startupOptions, + false, // lookup async + false, // insert if not exists + null, // cache + 1000L, // scan partition discovery interval + false, // is data lake enabled + null, // merge engine type + Maps.newHashMap(), + null); // lease context + } + + @Test + void testBatchModePrimaryKeyPushdown() { + // Test primary key pushdown in batch mode with FULL startup mode + FieldReferenceExpression fieldRef = + new FieldReferenceExpression("id", DataTypes.INT(), 0, 0); + ValueLiteralExpression literal = new ValueLiteralExpression(5); + CallExpression equalCall = + new CallExpression( + BuiltInFunctionDefinitions.EQUALS, + Arrays.asList(fieldRef, literal), + DataTypes.BOOLEAN()); + + List filters = Arrays.asList(equalCall); + + FlinkTableSource.Result result = tableSource.applyFilters(filters); + + // In batch mode with FULL startup and complete primary key filter, + // should use single row filter for point lookup + assertThat(result.getAcceptedFilters()).hasSize(1); + // FLINK-38635: all filters remain for safety (scan vs lookup ambiguity) + assertThat(result.getRemainingFilters()).hasSize(1); + assertThat(tableSource.getSingleRowFilter()).isNotNull(); + assertThat(tableSource.getLogRecordBatchFilter()).isNull(); + + // Verify the single row filter contains the correct value + GenericRowData singleRowFilter = tableSource.getSingleRowFilter(); + assertThat(singleRowFilter.getInt(0)).isEqualTo(5); + } + + @Test + void testBatchModeIncompletePrimaryKeyPushdown() { + // Test incomplete primary key filter in batch mode (should not push down) + FieldReferenceExpression fieldRef = + new FieldReferenceExpression("name", DataTypes.STRING(), 0, 1); + ValueLiteralExpression literal = new ValueLiteralExpression("test"); + CallExpression equalCall = + new CallExpression( + BuiltInFunctionDefinitions.EQUALS, + Arrays.asList(fieldRef, literal), + DataTypes.BOOLEAN()); + + List filters = Arrays.asList(equalCall); + + FlinkTableSource.Result result = tableSource.applyFilters(filters); + + // Non-primary key filters should not be pushed down in batch mode + assertThat(result.getAcceptedFilters()).isEmpty(); + assertThat(result.getRemainingFilters()).hasSize(1); + assertThat(tableSource.getSingleRowFilter()).isNull(); + assertThat(tableSource.getLogRecordBatchFilter()).isNull(); + } + + @Test + void testBatchModeNonFullStartupMode() { + // Test batch mode with non-FULL startup mode + FlinkConnectorOptionsUtils.StartupOptions startupOptions = + new FlinkConnectorOptionsUtils.StartupOptions(); + startupOptions.startupMode = FlinkConnectorOptions.ScanStartupMode.EARLIEST; + + // Create a new table output type for the non-full startup test + RowType nonFullStartupTableOutputType = + (RowType) + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT()), + DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("value", DataTypes.BIGINT())) + .getLogicalType(); + + // Create table config for testing + Configuration tableConfig = new Configuration(); + + tableConfig.set(ConfigOptions.TABLE_STATISTICS_COLUMNS, "*"); + + FlinkTableSource nonFullStartupTableSource = + new FlinkTableSource( + TablePath.of("test_db", "test_batch_table"), + new Configuration(), + tableConfig, + nonFullStartupTableOutputType, + new int[] {0}, // primary key indexes + new int[] {}, // bucket key indexes + new int[] {}, // partition key indexes + false, // batch mode + startupOptions, + false, // lookup async + false, // insert if not exists + null, // cache + 1000L, // scan partition discovery interval + false, // is data lake enabled + null, // merge engine type + Maps.newHashMap(), + null); // lease context + + FieldReferenceExpression fieldRef = + new FieldReferenceExpression("id", DataTypes.INT(), 0, 0); + ValueLiteralExpression literal = new ValueLiteralExpression(5); + CallExpression equalCall = + new CallExpression( + BuiltInFunctionDefinitions.EQUALS, + Arrays.asList(fieldRef, literal), + DataTypes.BOOLEAN()); + + List filters = Arrays.asList(equalCall); + + FlinkTableSource.Result result = nonFullStartupTableSource.applyFilters(filters); + + // With non-FULL startup mode, should not use single row filter + assertThat(result.getAcceptedFilters()).isEmpty(); + assertThat(result.getRemainingFilters()).hasSize(1); + assertThat(nonFullStartupTableSource.getSingleRowFilter()).isNull(); + } + } +} diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java index 224798443d..6ec3bac431 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java @@ -71,7 +71,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS; import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertQueryResultExactOrder; import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder; import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout; @@ -130,7 +129,7 @@ void before() { tEnv.executeSql( String.format( "create catalog %s with ('type' = 'fluss', '%s' = '%s')", - CATALOG_NAME, BOOTSTRAP_SERVERS.key(), bootstrapServers)); + CATALOG_NAME, ConfigOptions.BOOTSTRAP_SERVERS.key(), bootstrapServers)); tEnv.executeSql("use catalog " + CATALOG_NAME); tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4); tEnv.executeSql("create database " + DEFAULT_DB); @@ -1312,7 +1311,7 @@ void testStreamingReadWithCombinedFilters1() throws Exception { + "filter=[=(c, _UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")], " + "project=[a, c, d]]], fields=[a, c, d])"); - // test column filter、partition filter and flink runtime filter + // test column filter, partition filter and flink runtime filter org.apache.flink.util.CloseableIterator rowIter = tEnv.executeSql( "select a,c,d from combined_filters_table where c ='2025' and d % 200 = 0") @@ -1329,7 +1328,7 @@ void testStreamingReadWithCombinedFilters1() throws Exception { + "filter=[=(c, _UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")], " + "project=[a, c, d]]], fields=[a, c, d])"); - // test column filter、partition filter and flink runtime filter + // test column filter, partition filter and flink runtime filter rowIter = tEnv.executeSql( "select a,c,d from combined_filters_table where c ='2025' and d = 200") @@ -1356,6 +1355,139 @@ void testNonPartitionPushDown() throws Exception { assertResultsIgnoreOrder(rowIter, expectedRowValues, true); } + @Test + void testStreamingReadNonPKTableWithCombinedFilters() throws Exception { + tEnv.executeSql( + "create table combined_filters_table" + + " (a int not null, b varchar, c string, d int) partitioned by (c)" + + " with ('table.statistics.columns' = 'd')"); + TablePath tablePath = TablePath.of(DEFAULT_DB, "combined_filters_table"); + tEnv.executeSql("alter table combined_filters_table add partition (c=2025)"); + tEnv.executeSql("alter table combined_filters_table add partition (c=2026)"); + + List rows = new ArrayList<>(); + List expectedRowValues = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + rows.add(row(i, "v" + i, "2025", i * 100)); + if (i > 2) { + expectedRowValues.add(String.format("+I[%d, 2025, %d]", i, i * 100)); + } + } + writeRows(conn, tablePath, rows, true); + + for (int i = 0; i < 10; i++) { + rows.add(row(i, "v" + i, "2026", i * 100)); + } + + writeRows(conn, tablePath, rows, true); + + String plan = + tEnv.explainSql( + "select a,c,d from combined_filters_table where c ='2025' and d > 200"); + + // assert both partition filter and record batch filter are pushed down + assertThat(plan) + .contains( + "TableSourceScan(table=[[testcatalog, defaultdb, combined_filters_table, " + + "filter=[and(=(c, _UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"), >(d, 200))], " + + "project=[a, c, d]]], fields=[a, c, d])"); + // assert predicates are still retained in the Calc operator (FLINK-38635 safety net) + assertThat(plan).contains("where="); + + // test column filter, partition filter and flink runtime filter + org.apache.flink.util.CloseableIterator rowIter = + tEnv.executeSql( + "select a,c,d from combined_filters_table where c ='2025' and d > 200;") + .collect(); + + assertResultsIgnoreOrder(rowIter, expectedRowValues, true); + } + + /** + * Test RecordBatchFilter push down with large dataset and multiple range filters to verify + * correctness when log segments have gaps due to filtering. + */ + @Test + void testRecordBatchFilterPushDownWithLogGaps() throws Exception { + // Create a log table for testing record batch filter push down with specific batch size + // configurations + // to ensure predictable RecordBatch sizes during writing and reading + tEnv.executeSql( + "create table record_batch_filter_test " + + "(id int, sequence_num int, name varchar, score double) " + + "with (" + + "'table.log.format' = 'ARROW', " + + "'table.statistics.columns' = 'sequence_num', " + // Writer configuration: aim for ~500 records per RecordBatch + // Each record has: int(4) + int(4) + varchar(~12) + double(8) ≈ 28 bytes + // So 500 records ≈ 14KB, set batch size to 32KB to account for overhead + + "'client.writer.batch-size' = '32kb', " + // Shorter timeout for predictable batching + + "'client.writer.batch-timeout' = '10ms', " + // limit read batch size to 500 records max + + "'client.scanner.log.max-poll-records' = '500', " + + "'client.scanner.log.fetch.max-bytes-for-bucket' = '32kb' " + + ")"); + + TablePath tablePath = TablePath.of(DEFAULT_DB, "record_batch_filter_test"); + + // Write 10,000 ordered rows with sequential sequence_num in controlled batches + // to ensure predictable RecordBatch sizes + int totalRecords = 10000; + int batchSize = 500; + + for (int batchStart = 0; batchStart < totalRecords; batchStart += batchSize) { + List batchRows = new ArrayList<>(); + int batchEnd = Math.min(batchStart + batchSize, totalRecords); + + for (int i = batchStart; i < batchEnd; i++) { + batchRows.add(row(i, i, "value_" + i, i * 0.1)); + } + + // Write each batch separately to ensure proper RecordBatch formation + writeRows(conn, tablePath, batchRows, true); + } + + // Define three ranges to filter: 5500-5999, 8350-8400, 9400-9999 + String query = + "select id, sequence_num, name from record_batch_filter_test " + + "where (sequence_num >= 5500 and sequence_num < 6000) " + + "or (sequence_num >= 8350 and sequence_num <= 8400) " + + "or (sequence_num >= 9400 and sequence_num < 10000)"; + + // Verify that record batch filters are pushed down + String plan = tEnv.explainSql(query); + assertThat(plan) + .contains( + "TableSourceScan(table=[[testcatalog, defaultdb, record_batch_filter_test, " + + "filter=[OR(OR(AND(>=(sequence_num, 5500), <(sequence_num, 6000)), " + + "AND(>=(sequence_num, 8350), <=(sequence_num, 8400))), " + + "AND(>=(sequence_num, 9400), <(sequence_num, 10000)))]"); + + // Collect results and verify correctness + List expectedResults = new ArrayList<>(); + + // Range 1: 5500-5999 (500 records) + for (int i = 5500; i < 6000; i++) { + expectedResults.add(String.format("+I[%d, %d, value_%d]", i, i, i)); + } + + // Range 2: 8350-8400 (51 records) + for (int i = 8350; i <= 8400; i++) { + expectedResults.add(String.format("+I[%d, %d, value_%d]", i, i, i)); + } + + // Range 3: 9400-9999 (600 records) + for (int i = 9400; i < 10000; i++) { + expectedResults.add(String.format("+I[%d, %d, value_%d]", i, i, i)); + } + + try (CloseableIterator rowIter = tEnv.executeSql(query).collect()) { + assertResultsIgnoreOrder(rowIter, expectedResults, true); + } + } + private List writeRowsToTwoPartition(TablePath tablePath, Collection partitions) throws Exception { List rows = new ArrayList<>(); @@ -1455,7 +1587,7 @@ void testStreamingReadWithCombinedFiltersAndInExpr() throws Exception { .contains( "TableSourceScan(table=[[testcatalog, defaultdb, combined_filters_table_in, filter=[OR(=(c, _UTF-16LE'2025'), =(c, _UTF-16LE'2026'))], project=[a, c, d]]], fields=[a, c, d])"); - // test column filter、partition filter and flink runtime filter + // test column filter, partition filter and flink runtime filter org.apache.flink.util.CloseableIterator rowIter = tEnv.executeSql(query1).collect(); assertResultsIgnoreOrder(rowIter, expectedRowValues, true); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceReaderTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceReaderTest.java index bf97922515..556a81e69d 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceReaderTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceReaderTest.java @@ -183,6 +183,7 @@ private FlinkSourceReader createReader( sourceOutputType, context, null, + null, new FlinkSourceReaderMetrics(context.metricGroup()), recordEmitter, lakeSource); diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java index 59f1116eda..c170518995 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java @@ -89,8 +89,9 @@ void testSanityCheck() throws Exception { "id", DataTypes.BIGINT().copy(false)), DataTypes.FIELD("name", DataTypes.STRING())), new int[] {1, 0}, - createMockSourceReaderMetrics(), - null)) + null, + null, + createMockSourceReaderMetrics())) .isInstanceOf(ValidationException.class) .hasMessage( "The Flink query schema is not matched to Fluss table schema. \n" @@ -106,8 +107,9 @@ void testSanityCheck() throws Exception { DataTypes.FIELD("name2", DataTypes.STRING()), DataTypes.FIELD("id", DataTypes.BIGINT())), null, - createMockSourceReaderMetrics(), - null)) + null, + null, + createMockSourceReaderMetrics())) .isInstanceOf(IllegalArgumentException.class) .hasMessage("Field name2 does not exist in the row type."); @@ -119,8 +121,9 @@ void testSanityCheck() throws Exception { DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("id", DataTypes.BIGINT().copy(false))), null, - createMockSourceReaderMetrics(), - null); + null, + null, + createMockSourceReaderMetrics()); assertThat(flinkSourceSplitReader.getProjectedFields()).isNull(); } @@ -427,7 +430,7 @@ private void assignSplits(FlinkSourceSplitReader splitReader, List`, `>`, `>=`, `<`, `<=` +- `IN (...)` +- `IS NULL`, `IS NOT NULL` +- `BETWEEN ... AND ...` +- `LIKE 'abc%'` (prefix), `LIKE '%abc'` (suffix), `LIKE '%abc%'` (contains) +- `AND` / `OR` conjunctions + +All columns referenced in a filter expression must have statistics enabled. If any referenced column lacks statistics, that filter will not be pushed down. + +#### Example + +**1. Create a table with statistics enabled:** +```sql title="Flink SQL" +CREATE TABLE sensor_data ( + sensor_id INT NOT NULL, + temperature DOUBLE NOT NULL, + humidity DOUBLE NOT NULL, + location STRING NOT NULL, + ts TIMESTAMP NOT NULL +) WITH ( + 'table.statistics.columns' = 'temperature,location' +); +``` + +**2. Query with filter:** +```sql title="Flink SQL" +SELECT * FROM sensor_data WHERE temperature > 30.0 AND location = 'warehouse-A'; +``` + +**3. Verify with `EXPLAIN`:** +```sql title="Flink SQL" +EXPLAIN SELECT * FROM sensor_data WHERE temperature > 30.0 AND location = 'warehouse-A'; +``` + +If filter pushdown is active, the `TableSourceScan` node in the execution plan will contain a `filter=[...]` clause showing the pushed-down predicates. For example: + +```text +TableSourceScan(table=[[..., sensor_data, filter=[and(>(temperature, 30.0:DOUBLE), =(location, ...))]]], fields=[...]) +``` + +The server evaluates these predicates against per-batch column statistics and skips entire record batches that cannot contain matching rows. Note that the filter also appears in a `Calc` node above the source — this is expected because Flink retains all filters for client-side verification as a safety net. + ## Batch Read ### Limit Read