Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.fluss.client.table.scanner.log.LogScanner;
import org.apache.fluss.client.table.scanner.log.TypedLogScanner;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.predicate.Predicate;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -60,6 +61,19 @@ public interface Scan {
*/
Scan limit(int rowNumber);

/**
* Returns a new scan from this that will apply the given predicate filter.
*
* <p>Note: the filter currently only supports record batch level filtering for log scanners,
* not row level filtering. The computing engine still needs to perform secondary filtering on
* the results. Batch scanners do not support filter pushdown.
*
* @param predicate the predicate to apply for record batch level filtering
*/
default Scan filter(@Nullable Predicate predicate) {
throw new UnsupportedOperationException("Filter pushdown is not supported by this Scan.");
}

/**
* Creates a {@link LogScanner} to continuously read log data for this scan.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.fluss.metadata.SchemaGetter;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.predicate.Predicate;
import org.apache.fluss.types.RowType;

import javax.annotation.Nullable;
Expand All @@ -51,29 +52,36 @@ public class TableScan implements Scan {

/** The projected fields to do projection. No projection if is null. */
@Nullable private final int[] projectedColumns;

/** The limited row number to read. No limit if is null. */
@Nullable private final Integer limit;

/** The record batch filter to apply. No filter if is null. */
@Nullable private final Predicate recordBatchFilter;

public TableScan(FlussConnection conn, TableInfo tableInfo, SchemaGetter schemaGetter) {
this(conn, tableInfo, schemaGetter, null, null);
this(conn, tableInfo, schemaGetter, null, null, null);
}

private TableScan(
FlussConnection conn,
TableInfo tableInfo,
SchemaGetter schemaGetter,
@Nullable int[] projectedColumns,
@Nullable Integer limit) {
@Nullable Integer limit,
@Nullable Predicate recordBatchFilter) {
this.conn = conn;
this.tableInfo = tableInfo;
this.projectedColumns = projectedColumns;
this.limit = limit;
this.schemaGetter = schemaGetter;
this.recordBatchFilter = recordBatchFilter;
}

@Override
public Scan project(@Nullable int[] projectedColumns) {
return new TableScan(conn, tableInfo, schemaGetter, projectedColumns, limit);
return new TableScan(
conn, tableInfo, schemaGetter, projectedColumns, limit, recordBatchFilter);
}

@Override
Expand All @@ -92,12 +100,19 @@ public Scan project(List<String> projectedColumnNames) {
}
columnIndexes[i] = index;
}
return new TableScan(conn, tableInfo, schemaGetter, columnIndexes, limit);
return new TableScan(
conn, tableInfo, schemaGetter, columnIndexes, limit, recordBatchFilter);
}

@Override
public Scan limit(int rowNumber) {
return new TableScan(conn, tableInfo, schemaGetter, projectedColumns, rowNumber);
return new TableScan(
conn, tableInfo, schemaGetter, projectedColumns, rowNumber, recordBatchFilter);
}

@Override
public Scan filter(@Nullable Predicate predicate) {
return new TableScan(conn, tableInfo, schemaGetter, projectedColumns, limit, predicate);
}

@Override
Expand All @@ -116,7 +131,8 @@ public LogScanner createLogScanner() {
conn.getClientMetricGroup(),
conn.getOrCreateRemoteFileDownloader(),
projectedColumns,
schemaGetter);
schemaGetter,
recordBatchFilter);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.fluss.record.LogRecordReadContext;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.rpc.messages.FetchLogRequest;
import org.apache.fluss.rpc.protocol.ApiError;
import org.apache.fluss.utils.CloseableIterator;

Expand All @@ -40,10 +39,12 @@
import java.util.Iterator;
import java.util.List;

import static org.apache.fluss.utils.Preconditions.checkArgument;

/**
* {@link CompletedFetch} represents the result that was returned from the tablet server via a
* {@link FetchLogRequest}, which can be a {@link LogRecordBatch} or remote log segments path. It
* contains logic to maintain state between calls to {@link #fetchRecords(int)}.
* {@link CompletedFetch} represents the result that was returned from the tablet server via a fetch
* log request, which can be a {@link LogRecordBatch} or remote log segments path. It contains logic
* to maintain state between calls to {@link #fetchRecords(int)}.
*/
@Internal
abstract class CompletedFetch {
Expand All @@ -53,6 +54,7 @@ abstract class CompletedFetch {
final ApiError error;
final int sizeInBytes;
final long highWatermark;
private final long fetchOffset;

private final boolean isCheckCrcs;
private final Iterator<LogRecordBatch> batches;
Expand All @@ -79,7 +81,8 @@ public CompletedFetch(
LogRecordReadContext readContext,
LogScannerStatus logScannerStatus,
boolean isCheckCrcs,
long fetchOffset) {
long fetchOffset,
long nextFetchOffset) {
this.tableBucket = tableBucket;
this.error = error;
this.sizeInBytes = sizeInBytes;
Expand All @@ -88,8 +91,21 @@ public CompletedFetch(
this.readContext = readContext;
this.isCheckCrcs = isCheckCrcs;
this.logScannerStatus = logScannerStatus;
this.nextFetchOffset = fetchOffset;
this.selectedFieldGetters = readContext.getSelectedFieldGetters();
this.fetchOffset = fetchOffset;
checkArgument(
nextFetchOffset == -1 || nextFetchOffset >= fetchOffset,
"nextFetchOffset (%s) must be -1 or >= fetchOffset (%s) for bucket %s.",
nextFetchOffset,
fetchOffset,
tableBucket);
checkArgument(
nextFetchOffset == -1 || sizeInBytes == 0,
"When nextFetchOffset is set (%s), records must be empty (sizeInBytes=%s) for bucket %s.",
nextFetchOffset,
sizeInBytes,
tableBucket);
this.nextFetchOffset = nextFetchOffset != -1 ? nextFetchOffset : fetchOffset;
}

// TODO: optimize this to avoid deep copying the record.
Expand All @@ -113,6 +129,10 @@ boolean isInitialized() {
return initialized;
}

long fetchOffset() {
return fetchOffset;
}

long nextFetchOffset() {
return nextFetchOffset;
}
Expand Down Expand Up @@ -178,6 +198,8 @@ public List<ScanRecord> fetchRecords(int maxRecords) {
ScanRecord record = toScanRecord(lastRecord);
scanRecords.add(record);
recordsRead++;
// Per-record offset is a best-effort value; the authoritative offset
// comes from the batch's nextLogOffset once the batch is fully consumed.
nextFetchOffset = lastRecord.logOffset() + 1;
cachedRecordException = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public DefaultCompletedFetch(
readContext,
logScannerStatus,
isCheckCrc,
fetchOffset);
fetchOffset,
fetchLogResultForBucket.getFilteredEndOffset());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,12 @@ private List<ScanRecord> fetchRecords(CompletedFetch nextInLineFetch, int maxRec
"Ignoring fetched records for {} at offset {} since the current offset is null which means the "
+ "bucket has been unsubscribe.",
tb,
nextInLineFetch.nextFetchOffset());
nextInLineFetch.fetchOffset());
} else {
if (nextInLineFetch.nextFetchOffset() == offset) {
// When server-side filtering is enabled, the CompletedFetch's nextFetchOffset
// may have been advanced by filteredEndOffset (all batches filtered out).
// The fetchOffset still matches the original request offset.
if (nextInLineFetch.fetchOffset() == offset) {
List<ScanRecord> records = nextInLineFetch.fetchRecords(maxRecords);
LOG.trace(
"Returning {} fetched records at offset {} for assigned bucket {}.",
Expand All @@ -180,7 +183,7 @@ private List<ScanRecord> fetchRecords(CompletedFetch nextInLineFetch, int maxRec
LOG.warn(
"Ignoring fetched records for {} at offset {} since the current offset is {}",
nextInLineFetch.tableBucket,
nextInLineFetch.nextFetchOffset(),
nextInLineFetch.fetchOffset(),
offset);
}
}
Expand Down Expand Up @@ -215,7 +218,7 @@ private List<ScanRecord> fetchRecords(CompletedFetch nextInLineFetch, int maxRec

private @Nullable CompletedFetch handleInitializeSuccess(CompletedFetch completedFetch) {
TableBucket tb = completedFetch.tableBucket;
long fetchOffset = completedFetch.nextFetchOffset();
long fetchOffset = completedFetch.fetchOffset();

// we are interested in this fetch only if the beginning offset matches the
// current consumed position.
Expand Down Expand Up @@ -249,7 +252,7 @@ private List<ScanRecord> fetchRecords(CompletedFetch nextInLineFetch, int maxRec
private void handleInitializeErrors(
CompletedFetch completedFetch, Errors error, String errorMessage) {
TableBucket tb = completedFetch.tableBucket;
long fetchOffset = completedFetch.nextFetchOffset();
long fetchOffset = completedFetch.fetchOffset();
if (error == Errors.NOT_LEADER_OR_FOLLOWER
|| error == Errors.LOG_STORAGE_EXCEPTION
|| error == Errors.KV_STORAGE_EXCEPTION
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePartition;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.predicate.Predicate;
import org.apache.fluss.record.LogRecordReadContext;
import org.apache.fluss.record.LogRecords;
import org.apache.fluss.record.MemoryLogRecords;
Expand All @@ -52,6 +53,8 @@
import org.apache.fluss.rpc.messages.PbFetchLogRespForTable;
import org.apache.fluss.rpc.protocol.ApiError;
import org.apache.fluss.rpc.protocol.Errors;
import org.apache.fluss.rpc.util.PredicateMessageUtils;
import org.apache.fluss.types.RowType;
import org.apache.fluss.utils.IOUtils;
import org.apache.fluss.utils.Projection;

Expand Down Expand Up @@ -93,6 +96,10 @@ public class LogFetcher implements Closeable {
// bytes from remote file.
private final LogRecordReadContext remoteReadContext;
@Nullable private final Projection projection;
@Nullable private final Predicate recordBatchFilter;
@Nullable private final org.apache.fluss.rpc.messages.PbPredicate cachedPbPredicate;
private final RowType fullRowType;
private final int schemaId;
private final int maxFetchBytes;
private final int maxBucketFetchBytes;
private final int minFetchBytes;
Expand All @@ -115,6 +122,7 @@ public class LogFetcher implements Closeable {
public LogFetcher(
TableInfo tableInfo,
@Nullable Projection projection,
@Nullable Predicate recordBatchFilter,
LogScannerStatus logScannerStatus,
Configuration conf,
MetadataUpdater metadataUpdater,
Expand All @@ -128,6 +136,14 @@ public LogFetcher(
this.remoteReadContext =
LogRecordReadContext.createReadContext(tableInfo, true, projection, schemaGetter);
this.projection = projection;
this.recordBatchFilter = recordBatchFilter;
this.cachedPbPredicate =
recordBatchFilter != null
? PredicateMessageUtils.toPbPredicate(
recordBatchFilter, tableInfo.getRowType())
: null;
this.fullRowType = tableInfo.getRowType();
this.schemaId = tableInfo.getSchemaId();
this.logScannerStatus = logScannerStatus;
this.maxFetchBytes =
(int) conf.get(ConfigOptions.CLIENT_SCANNER_LOG_FETCH_MAX_BYTES).getBytes();
Expand Down Expand Up @@ -386,9 +402,10 @@ private synchronized void handleFetchLogResponse(
} else {
LogRecords logRecords = fetchResultForBucket.recordsOrEmpty();
if (!MemoryLogRecords.EMPTY.equals(logRecords)
|| fetchResultForBucket.getErrorCode() != Errors.NONE.code()) {
// In oder to not signal notEmptyCondition, add completed
// fetch to buffer until log records is not empty.
|| fetchResultForBucket.getErrorCode() != Errors.NONE.code()
|| fetchResultForBucket.hasFilteredEndOffset()) {
// In oder to not signal notEmptyCondition, add completed fetch to
// buffer until log records is not empty.
DefaultCompletedFetch completedFetch =
new DefaultCompletedFetch(
tb,
Expand Down Expand Up @@ -534,6 +551,10 @@ Map<Integer, FetchLogRequest> prepareFetchLogRequests() {
} else {
reqForTable.setProjectionPushdownEnabled(false);
}
if (null != cachedPbPredicate) {
reqForTable.setFilterPredicate(cachedPbPredicate);
reqForTable.setFilterSchemaId(schemaId);
}
reqForTable.addAllBucketsReqs(reqForBuckets);
fetchLogRequest.addAllTablesReqs(Collections.singletonList(reqForTable));
fetchLogRequests.put(leaderId, fetchLogRequest);
Expand Down Expand Up @@ -577,6 +598,11 @@ public synchronized void close() throws IOException {
}
}

@VisibleForTesting
LogScannerStatus getLogScannerStatus() {
return logScannerStatus;
}

@VisibleForTesting
int getCompletedFetchesSize() {
return logFetchBuffer.bufferedBuckets().size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.predicate.Predicate;
import org.apache.fluss.rpc.metrics.ClientMetricGroup;
import org.apache.fluss.types.RowType;
import org.apache.fluss.utils.Projection;
Expand Down Expand Up @@ -84,7 +85,8 @@ public LogScannerImpl(
ClientMetricGroup clientMetricGroup,
RemoteFileDownloader remoteFileDownloader,
@Nullable int[] projectedFields,
SchemaGetter schemaGetter) {
SchemaGetter schemaGetter,
@Nullable Predicate recordBatchFilter) {
this.tablePath = tableInfo.getTablePath();
this.tableId = tableInfo.getTableId();
this.isPartitionedTable = tableInfo.isPartitioned();
Expand All @@ -98,6 +100,7 @@ public LogScannerImpl(
new LogFetcher(
tableInfo,
projection,
recordBatchFilter,
logScannerStatus,
conf,
metadataUpdater,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ class RemoteCompletedFetch extends CompletedFetch {
readContext,
logScannerStatus,
isCheckCrc,
fetchOffset);
fetchOffset,
-1);
this.fileLogRecords = fileLogRecords;
this.recycleCallback = recycleCallback;
}
Expand Down
Loading
Loading