diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java
index 2b7a5e7329..430ee6b8aa 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java
@@ -22,6 +22,7 @@
import org.apache.fluss.client.table.scanner.log.LogScanner;
import org.apache.fluss.client.table.scanner.log.TypedLogScanner;
import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.predicate.Predicate;
import javax.annotation.Nullable;
@@ -60,6 +61,19 @@ public interface Scan {
*/
Scan limit(int rowNumber);
+ /**
+ * Returns a new scan from this that will apply the given predicate filter.
+ *
+ *
Note: the filter currently only supports record batch level filtering for log scanners,
+ * not row level filtering. The computing engine still needs to perform secondary filtering on
+ * the results. Batch scanners do not support filter pushdown.
+ *
+ * @param predicate the predicate to apply for record batch level filtering
+ */
+ default Scan filter(@Nullable Predicate predicate) {
+ throw new UnsupportedOperationException("Filter pushdown is not supported by this Scan.");
+ }
+
/**
* Creates a {@link LogScanner} to continuously read log data for this scan.
*
diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java
index 640f2541a3..5b7718c7ad 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java
@@ -34,6 +34,7 @@
import org.apache.fluss.metadata.SchemaGetter;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.predicate.Predicate;
import org.apache.fluss.types.RowType;
import javax.annotation.Nullable;
@@ -51,11 +52,15 @@ public class TableScan implements Scan {
/** The projected fields to do projection. No projection if is null. */
@Nullable private final int[] projectedColumns;
+
/** The limited row number to read. No limit if is null. */
@Nullable private final Integer limit;
+ /** The record batch filter to apply. No filter if is null. */
+ @Nullable private final Predicate recordBatchFilter;
+
public TableScan(FlussConnection conn, TableInfo tableInfo, SchemaGetter schemaGetter) {
- this(conn, tableInfo, schemaGetter, null, null);
+ this(conn, tableInfo, schemaGetter, null, null, null);
}
private TableScan(
@@ -63,17 +68,20 @@ private TableScan(
TableInfo tableInfo,
SchemaGetter schemaGetter,
@Nullable int[] projectedColumns,
- @Nullable Integer limit) {
+ @Nullable Integer limit,
+ @Nullable Predicate recordBatchFilter) {
this.conn = conn;
this.tableInfo = tableInfo;
this.projectedColumns = projectedColumns;
this.limit = limit;
this.schemaGetter = schemaGetter;
+ this.recordBatchFilter = recordBatchFilter;
}
@Override
public Scan project(@Nullable int[] projectedColumns) {
- return new TableScan(conn, tableInfo, schemaGetter, projectedColumns, limit);
+ return new TableScan(
+ conn, tableInfo, schemaGetter, projectedColumns, limit, recordBatchFilter);
}
@Override
@@ -92,12 +100,19 @@ public Scan project(List projectedColumnNames) {
}
columnIndexes[i] = index;
}
- return new TableScan(conn, tableInfo, schemaGetter, columnIndexes, limit);
+ return new TableScan(
+ conn, tableInfo, schemaGetter, columnIndexes, limit, recordBatchFilter);
}
@Override
public Scan limit(int rowNumber) {
- return new TableScan(conn, tableInfo, schemaGetter, projectedColumns, rowNumber);
+ return new TableScan(
+ conn, tableInfo, schemaGetter, projectedColumns, rowNumber, recordBatchFilter);
+ }
+
+ @Override
+ public Scan filter(@Nullable Predicate predicate) {
+ return new TableScan(conn, tableInfo, schemaGetter, projectedColumns, limit, predicate);
}
@Override
@@ -116,7 +131,8 @@ public LogScanner createLogScanner() {
conn.getClientMetricGroup(),
conn.getOrCreateRemoteFileDownloader(),
projectedColumns,
- schemaGetter);
+ schemaGetter,
+ recordBatchFilter);
}
@Override
diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/CompletedFetch.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/CompletedFetch.java
index 8f29f3ef34..0113335210 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/CompletedFetch.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/CompletedFetch.java
@@ -27,7 +27,6 @@
import org.apache.fluss.record.LogRecordReadContext;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.InternalRow;
-import org.apache.fluss.rpc.messages.FetchLogRequest;
import org.apache.fluss.rpc.protocol.ApiError;
import org.apache.fluss.utils.CloseableIterator;
@@ -40,10 +39,12 @@
import java.util.Iterator;
import java.util.List;
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+
/**
- * {@link CompletedFetch} represents the result that was returned from the tablet server via a
- * {@link FetchLogRequest}, which can be a {@link LogRecordBatch} or remote log segments path. It
- * contains logic to maintain state between calls to {@link #fetchRecords(int)}.
+ * {@link CompletedFetch} represents the result that was returned from the tablet server via a fetch
+ * log request, which can be a {@link LogRecordBatch} or remote log segments path. It contains logic
+ * to maintain state between calls to {@link #fetchRecords(int)}.
*/
@Internal
abstract class CompletedFetch {
@@ -53,6 +54,7 @@ abstract class CompletedFetch {
final ApiError error;
final int sizeInBytes;
final long highWatermark;
+ private final long fetchOffset;
private final boolean isCheckCrcs;
private final Iterator batches;
@@ -79,7 +81,8 @@ public CompletedFetch(
LogRecordReadContext readContext,
LogScannerStatus logScannerStatus,
boolean isCheckCrcs,
- long fetchOffset) {
+ long fetchOffset,
+ long nextFetchOffset) {
this.tableBucket = tableBucket;
this.error = error;
this.sizeInBytes = sizeInBytes;
@@ -88,8 +91,21 @@ public CompletedFetch(
this.readContext = readContext;
this.isCheckCrcs = isCheckCrcs;
this.logScannerStatus = logScannerStatus;
- this.nextFetchOffset = fetchOffset;
this.selectedFieldGetters = readContext.getSelectedFieldGetters();
+ this.fetchOffset = fetchOffset;
+ checkArgument(
+ nextFetchOffset == -1 || nextFetchOffset >= fetchOffset,
+ "nextFetchOffset (%s) must be -1 or >= fetchOffset (%s) for bucket %s.",
+ nextFetchOffset,
+ fetchOffset,
+ tableBucket);
+ checkArgument(
+ nextFetchOffset == -1 || sizeInBytes == 0,
+ "When nextFetchOffset is set (%s), records must be empty (sizeInBytes=%s) for bucket %s.",
+ nextFetchOffset,
+ sizeInBytes,
+ tableBucket);
+ this.nextFetchOffset = nextFetchOffset != -1 ? nextFetchOffset : fetchOffset;
}
// TODO: optimize this to avoid deep copying the record.
@@ -113,6 +129,10 @@ boolean isInitialized() {
return initialized;
}
+ long fetchOffset() {
+ return fetchOffset;
+ }
+
long nextFetchOffset() {
return nextFetchOffset;
}
@@ -178,6 +198,8 @@ public List fetchRecords(int maxRecords) {
ScanRecord record = toScanRecord(lastRecord);
scanRecords.add(record);
recordsRead++;
+ // Per-record offset is a best-effort value; the authoritative offset
+ // comes from the batch's nextLogOffset once the batch is fully consumed.
nextFetchOffset = lastRecord.logOffset() + 1;
cachedRecordException = null;
}
diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetch.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetch.java
index 95078bf36e..10c0298bad 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetch.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetch.java
@@ -46,6 +46,7 @@ public DefaultCompletedFetch(
readContext,
logScannerStatus,
isCheckCrc,
- fetchOffset);
+ fetchOffset,
+ fetchLogResultForBucket.getFilteredEndOffset());
}
}
diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java
index 34a3ec86e6..69791373a6 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchCollector.java
@@ -154,9 +154,12 @@ private List fetchRecords(CompletedFetch nextInLineFetch, int maxRec
"Ignoring fetched records for {} at offset {} since the current offset is null which means the "
+ "bucket has been unsubscribe.",
tb,
- nextInLineFetch.nextFetchOffset());
+ nextInLineFetch.fetchOffset());
} else {
- if (nextInLineFetch.nextFetchOffset() == offset) {
+ // When server-side filtering is enabled, the CompletedFetch's nextFetchOffset
+ // may have been advanced by filteredEndOffset (all batches filtered out).
+ // The fetchOffset still matches the original request offset.
+ if (nextInLineFetch.fetchOffset() == offset) {
List records = nextInLineFetch.fetchRecords(maxRecords);
LOG.trace(
"Returning {} fetched records at offset {} for assigned bucket {}.",
@@ -180,7 +183,7 @@ private List fetchRecords(CompletedFetch nextInLineFetch, int maxRec
LOG.warn(
"Ignoring fetched records for {} at offset {} since the current offset is {}",
nextInLineFetch.tableBucket,
- nextInLineFetch.nextFetchOffset(),
+ nextInLineFetch.fetchOffset(),
offset);
}
}
@@ -215,7 +218,7 @@ private List fetchRecords(CompletedFetch nextInLineFetch, int maxRec
private @Nullable CompletedFetch handleInitializeSuccess(CompletedFetch completedFetch) {
TableBucket tb = completedFetch.tableBucket;
- long fetchOffset = completedFetch.nextFetchOffset();
+ long fetchOffset = completedFetch.fetchOffset();
// we are interested in this fetch only if the beginning offset matches the
// current consumed position.
@@ -249,7 +252,7 @@ private List fetchRecords(CompletedFetch nextInLineFetch, int maxRec
private void handleInitializeErrors(
CompletedFetch completedFetch, Errors error, String errorMessage) {
TableBucket tb = completedFetch.tableBucket;
- long fetchOffset = completedFetch.nextFetchOffset();
+ long fetchOffset = completedFetch.fetchOffset();
if (error == Errors.NOT_LEADER_OR_FOLLOWER
|| error == Errors.LOG_STORAGE_EXCEPTION
|| error == Errors.KV_STORAGE_EXCEPTION
diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
index b5a03a1caf..92f6802f87 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java
@@ -37,6 +37,7 @@
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePartition;
import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.predicate.Predicate;
import org.apache.fluss.record.LogRecordReadContext;
import org.apache.fluss.record.LogRecords;
import org.apache.fluss.record.MemoryLogRecords;
@@ -52,6 +53,8 @@
import org.apache.fluss.rpc.messages.PbFetchLogRespForTable;
import org.apache.fluss.rpc.protocol.ApiError;
import org.apache.fluss.rpc.protocol.Errors;
+import org.apache.fluss.rpc.util.PredicateMessageUtils;
+import org.apache.fluss.types.RowType;
import org.apache.fluss.utils.IOUtils;
import org.apache.fluss.utils.Projection;
@@ -93,6 +96,10 @@ public class LogFetcher implements Closeable {
// bytes from remote file.
private final LogRecordReadContext remoteReadContext;
@Nullable private final Projection projection;
+ @Nullable private final Predicate recordBatchFilter;
+ @Nullable private final org.apache.fluss.rpc.messages.PbPredicate cachedPbPredicate;
+ private final RowType fullRowType;
+ private final int schemaId;
private final int maxFetchBytes;
private final int maxBucketFetchBytes;
private final int minFetchBytes;
@@ -115,6 +122,7 @@ public class LogFetcher implements Closeable {
public LogFetcher(
TableInfo tableInfo,
@Nullable Projection projection,
+ @Nullable Predicate recordBatchFilter,
LogScannerStatus logScannerStatus,
Configuration conf,
MetadataUpdater metadataUpdater,
@@ -128,6 +136,14 @@ public LogFetcher(
this.remoteReadContext =
LogRecordReadContext.createReadContext(tableInfo, true, projection, schemaGetter);
this.projection = projection;
+ this.recordBatchFilter = recordBatchFilter;
+ this.cachedPbPredicate =
+ recordBatchFilter != null
+ ? PredicateMessageUtils.toPbPredicate(
+ recordBatchFilter, tableInfo.getRowType())
+ : null;
+ this.fullRowType = tableInfo.getRowType();
+ this.schemaId = tableInfo.getSchemaId();
this.logScannerStatus = logScannerStatus;
this.maxFetchBytes =
(int) conf.get(ConfigOptions.CLIENT_SCANNER_LOG_FETCH_MAX_BYTES).getBytes();
@@ -386,9 +402,10 @@ private synchronized void handleFetchLogResponse(
} else {
LogRecords logRecords = fetchResultForBucket.recordsOrEmpty();
if (!MemoryLogRecords.EMPTY.equals(logRecords)
- || fetchResultForBucket.getErrorCode() != Errors.NONE.code()) {
- // In oder to not signal notEmptyCondition, add completed
- // fetch to buffer until log records is not empty.
+ || fetchResultForBucket.getErrorCode() != Errors.NONE.code()
+ || fetchResultForBucket.hasFilteredEndOffset()) {
+ // In oder to not signal notEmptyCondition, add completed fetch to
+ // buffer until log records is not empty.
DefaultCompletedFetch completedFetch =
new DefaultCompletedFetch(
tb,
@@ -534,6 +551,10 @@ Map prepareFetchLogRequests() {
} else {
reqForTable.setProjectionPushdownEnabled(false);
}
+ if (null != cachedPbPredicate) {
+ reqForTable.setFilterPredicate(cachedPbPredicate);
+ reqForTable.setFilterSchemaId(schemaId);
+ }
reqForTable.addAllBucketsReqs(reqForBuckets);
fetchLogRequest.addAllTablesReqs(Collections.singletonList(reqForTable));
fetchLogRequests.put(leaderId, fetchLogRequest);
@@ -577,6 +598,11 @@ public synchronized void close() throws IOException {
}
}
+ @VisibleForTesting
+ LogScannerStatus getLogScannerStatus() {
+ return logScannerStatus;
+ }
+
@VisibleForTesting
int getCompletedFetchesSize() {
return logFetchBuffer.bufferedBuckets().size();
diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java
index 33181bd7ae..48953da5c2 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java
@@ -28,6 +28,7 @@
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.predicate.Predicate;
import org.apache.fluss.rpc.metrics.ClientMetricGroup;
import org.apache.fluss.types.RowType;
import org.apache.fluss.utils.Projection;
@@ -84,7 +85,8 @@ public LogScannerImpl(
ClientMetricGroup clientMetricGroup,
RemoteFileDownloader remoteFileDownloader,
@Nullable int[] projectedFields,
- SchemaGetter schemaGetter) {
+ SchemaGetter schemaGetter,
+ @Nullable Predicate recordBatchFilter) {
this.tablePath = tableInfo.getTablePath();
this.tableId = tableInfo.getTableId();
this.isPartitionedTable = tableInfo.isPartitioned();
@@ -98,6 +100,7 @@ public LogScannerImpl(
new LogFetcher(
tableInfo,
projection,
+ recordBatchFilter,
logScannerStatus,
conf,
metadataUpdater,
diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteCompletedFetch.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteCompletedFetch.java
index ef867a8f44..00ff712aa9 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteCompletedFetch.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteCompletedFetch.java
@@ -55,7 +55,8 @@ class RemoteCompletedFetch extends CompletedFetch {
readContext,
logScannerStatus,
isCheckCrc,
- fetchOffset);
+ fetchOffset,
+ -1);
this.fileLogRecords = fileLogRecords;
this.recycleCallback = recycleCallback;
}
diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherFilterITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherFilterITCase.java
new file mode 100644
index 0000000000..a5bd5a4eef
--- /dev/null
+++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetcherFilterITCase.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.scanner.log;
+
+import org.apache.fluss.client.admin.ClientToServerITCaseBase;
+import org.apache.fluss.client.metadata.MetadataUpdater;
+import org.apache.fluss.client.metrics.TestingScannerMetricGroup;
+import org.apache.fluss.client.table.scanner.RemoteFileDownloader;
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.predicate.Predicate;
+import org.apache.fluss.predicate.PredicateBuilder;
+import org.apache.fluss.record.LogRecordBatchStatisticsTestUtils;
+import org.apache.fluss.record.MemoryLogRecords;
+import org.apache.fluss.rpc.RpcClient;
+import org.apache.fluss.rpc.gateway.TabletServerGateway;
+import org.apache.fluss.types.RowType;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
+import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
+import static org.apache.fluss.record.TestData.TEST_SCHEMA_GETTER;
+import static org.apache.fluss.server.testutils.RpcMessageTestUtils.newProduceLogRequest;
+import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Integration test for {@link LogFetcher} with recordBatchFilter pushdown scenarios. */
+public class LogFetcherFilterITCase extends ClientToServerITCaseBase {
+ private LogFetcher logFetcher;
+ private long tableId;
+ private final int bucketId0 = 0;
+ private final int bucketId1 = 1;
+
+ // Use the same row type as DATA1 to avoid Arrow compatibility issues
+ private static final RowType FILTER_TEST_ROW_TYPE = DATA1_ROW_TYPE;
+
+ // Data that should match filter (a > 5) - using DATA1 compatible structure
+ private static final List