Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -243,22 +243,46 @@ void testFilterCompletelyRejectsNonMatchingBatch() throws Exception {
allNonMatchingData, FILTER_TEST_ROW_TYPE, 0L, DEFAULT_SCHEMA_ID);
addRecordsToBucket(tb0, nonMatchingRecords);

// Send fetch and wait for the response. The server should filter out the
// entire batch (max=5, filter is a>5) and return a filteredEndOffset.
logFetcher.sendFetches();

retry(
Duration.ofMinutes(1),
() -> {
// The fetch may complete even if all batches are filtered out
// depending on implementation
assertThat(logFetcher.hasAvailableFetches()).isTrue();
});
retry(Duration.ofMinutes(1), () -> assertThat(logFetcher.hasAvailableFetches()).isTrue());

ScanRecords records = logFetcher.collectFetch();

// For a batch where max value = 5 and filter is a > 5, the entire batch should be
// filtered out at the server side. collectFetch() may omit the bucket entirely because
// there are no user-visible records to return.
// No user-visible records should be returned
assertThat(records.records(tb0)).isEmpty();

// If the server hasn't advanced HW yet, the offset may still be 0 on the
// first fetch. Use a retry loop with send-wait-collect to ensure the
// filteredEndOffset propagates through the full RPC round-trip.
if (logFetcher.getLogScannerStatus().getBucketOffset(tb0) == 0L) {
retry(
Duration.ofMinutes(1),
() -> {
logFetcher.sendFetches();
retry(
Duration.ofSeconds(30),
() -> assertThat(logFetcher.hasAvailableFetches()).isTrue());
logFetcher.collectFetch();

Long tb0Offset = logFetcher.getLogScannerStatus().getBucketOffset(tb0);
assertThat(tb0Offset)
.as(
"Fetch offset must advance past filtered data, "
+ "confirming filteredEndOffset survived RPC round-trip")
.isGreaterThan(0L);
});
} else {
// Offset already advanced on first fetch — filteredEndOffset RPC round-trip confirmed
assertThat(logFetcher.getLogScannerStatus().getBucketOffset(tb0))
.as("Fetch offset must advance past filtered data")
.isGreaterThan(0L);
}

// After offset advancement, fetcher should be clean
assertThat(logFetcher.getCompletedFetchesSize()).isEqualTo(0);
}

@Test
Expand All @@ -284,9 +308,8 @@ void testConsecutiveFetchesWithFilteredOffsetAdvancement() throws Exception {
LogRecordBatchStatisticsTestUtils.createLogRecordsWithStatistics(
MATCHING_DATA, FILTER_TEST_ROW_TYPE, 0L, DEFAULT_SCHEMA_ID));

// Use retry loop to handle the case where bucket 0's HW hasn't advanced yet
// on the first fetch cycle. The filtered-empty response (with filteredEndOffset)
// is only returned when the server has data up to HW to scan.
// Flatten retry: send-wait-collect in a single loop until bucket 0's offset advances.
// This avoids the fragile nested retry pattern that could accumulate in-flight requests.
retry(
Duration.ofMinutes(1),
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_FORMAT;
import static org.apache.fluss.config.ConfigOptions.TABLE_DELETE_BEHAVIOR;
import static org.apache.fluss.config.FlussConfigUtils.CLIENT_PREFIX;
import static org.apache.fluss.config.FlussConfigUtils.TABLE_PREFIX;
import static org.apache.fluss.flink.catalog.FlinkCatalog.LAKE_TABLE_SPLITTER;
import static org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils.getBucketKeyIndexes;
import static org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils.getBucketKeys;
Expand Down Expand Up @@ -150,6 +151,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
toFlussTablePath(context.getObjectIdentifier()),
toFlussClientConfig(
context.getCatalogTable().getOptions(), context.getConfiguration()),
toFlussTableConfig(tableOptions),
tableOutputType,
primaryKeyIndexes,
bucketKeyIndexes,
Expand Down Expand Up @@ -273,6 +275,24 @@ private static Configuration toFlussClientConfig(
return flussConfig;
}

private static Configuration toFlussTableConfig(ReadableConfig tableOptions) {
Configuration tableConfig = new Configuration();

// forward all table-level configs by iterating through known table options
// this approach is safer than using toMap() which may not exist in all Flink versions
for (ConfigOption<?> option : FlinkConnectorOptions.TABLE_OPTIONS) {
if (option.key().startsWith(TABLE_PREFIX)) {
Object value = tableOptions.getOptional(option).orElse(null);
if (value != null) {
// convert value to string for configuration storage
tableConfig.setString(option.key(), value.toString());
}
}
}

return tableConfig;
}

private static TablePath toFlussTablePath(ObjectIdentifier tablePath) {
return TablePath.of(tablePath.getDatabaseName(), tablePath.getObjectName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
isPartitioned,
flussRowType,
projectedFields,
null,
offsetsInitializer,
scanPartitionDiscoveryIntervalMs,
new BinlogDeserializationSchema(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
isPartitioned(),
flussRowType,
projectedFields,
null,
offsetsInitializer,
scanPartitionDiscoveryIntervalMs,
new ChangelogDeserializationSchema(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,16 @@ public class FlinkSource<OUT>
@Nullable private final LakeSource<LakeSplit> lakeSource;
private final LeaseContext leaseContext;

@Nullable private final Predicate logRecordBatchFilter;

public FlinkSource(
Configuration flussConf,
TablePath tablePath,
boolean hasPrimaryKey,
boolean isPartitioned,
RowType sourceOutputType,
@Nullable int[] projectedFields,
@Nullable Predicate logRecordBatchFilter,
OffsetsInitializer offsetsInitializer,
long scanPartitionDiscoveryIntervalMs,
FlussDeserializationSchema<OUT> deserializationSchema,
Expand All @@ -93,6 +96,7 @@ public FlinkSource(
isPartitioned,
sourceOutputType,
projectedFields,
logRecordBatchFilter,
offsetsInitializer,
scanPartitionDiscoveryIntervalMs,
deserializationSchema,
Expand All @@ -109,6 +113,7 @@ public FlinkSource(
boolean isPartitioned,
RowType sourceOutputType,
@Nullable int[] projectedFields,
@Nullable Predicate logRecordBatchFilter,
OffsetsInitializer offsetsInitializer,
long scanPartitionDiscoveryIntervalMs,
FlussDeserializationSchema<OUT> deserializationSchema,
Expand All @@ -122,6 +127,7 @@ public FlinkSource(
this.isPartitioned = isPartitioned;
this.sourceOutputType = sourceOutputType;
this.projectedFields = projectedFields;
this.logRecordBatchFilter = logRecordBatchFilter;
this.offsetsInitializer = offsetsInitializer;
this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs;
this.deserializationSchema = deserializationSchema;
Expand Down Expand Up @@ -213,6 +219,7 @@ public SourceReader<OUT, SourceSplitBase> createReader(SourceReaderContext conte
sourceOutputType,
context,
projectedFields,
logRecordBatchFilter,
flinkSourceReaderMetrics,
recordEmitter,
lakeSource);
Expand Down
Loading
Loading