diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/FlussTable.java b/fluss-client/src/main/java/org/apache/fluss/client/table/FlussTable.java index 8532f2a856..79cffa999d 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/FlussTable.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/FlussTable.java @@ -23,7 +23,9 @@ import org.apache.fluss.client.lookup.TableLookup; import org.apache.fluss.client.metadata.ClientSchemaGetter; import org.apache.fluss.client.table.scanner.Scan; +import org.apache.fluss.client.table.scanner.SnapshotQuery; import org.apache.fluss.client.table.scanner.TableScan; +import org.apache.fluss.client.table.scanner.TableSnapshotQuery; import org.apache.fluss.client.table.writer.Append; import org.apache.fluss.client.table.writer.TableAppend; import org.apache.fluss.client.table.writer.TableUpsert; @@ -67,6 +69,15 @@ public Scan newScan() { return new TableScan(conn, tableInfo, schemaGetter); } + @Override + public SnapshotQuery newSnapshotQuery() { + checkState( + hasPrimaryKey, + "Table %s is not a Primary Key Table and doesn't support SnapshotQuery.", + tablePath); + return new TableSnapshotQuery(conn, tableInfo, schemaGetter); + } + @Override public Lookup newLookup() { return new TableLookup( diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/Table.java b/fluss-client/src/main/java/org/apache/fluss/client/table/Table.java index 813b62034a..287311e9ae 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/Table.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/Table.java @@ -22,6 +22,7 @@ import org.apache.fluss.client.lookup.Lookup; import org.apache.fluss.client.lookup.Lookuper; import org.apache.fluss.client.table.scanner.Scan; +import org.apache.fluss.client.table.scanner.SnapshotQuery; import org.apache.fluss.client.table.writer.Append; import org.apache.fluss.client.table.writer.AppendWriter; import org.apache.fluss.client.table.writer.Upsert; @@ -55,6 +56,12 @@ public interface Table extends AutoCloseable { */ Scan newScan(); + /** + * Creates a new {@link SnapshotQuery} for this table to configure and execute a snapshot query + * to read all current data in a table bucket (requires to be a Primary Key Table). + */ + SnapshotQuery newSnapshotQuery(); + /** * Creates a new {@link Lookup} for this table to configure and create a {@link Lookuper} to * lookup data for this table by primary key or a prefix of primary key. diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/SnapshotQuery.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/SnapshotQuery.java new file mode 100644 index 0000000000..26b6c2920a --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/SnapshotQuery.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner; + +import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.utils.CloseableIterator; + +/** + * Used to configure and execute a snapshot query to read all kv data of a primary key table. + * + * @since 0.9 + */ +@PublicEvolving +public interface SnapshotQuery { + /** + * Executes the snapshot query to read all current data in the given table bucket. + * + * @param tableBucket the table bucket to read + * @return a closeable iterator of the rows in the table bucket + */ + CloseableIterator execute(TableBucket tableBucket); + + /** + * Executes the snapshot query to read all current data in the table. Everything around + * partitions and buckets will be taken care of from the client. + * + * @return a closeable iterator of the rows in the table + */ + CloseableIterator execute(); +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java index 0d1d28a0e2..bcd5a2ec5b 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java @@ -21,6 +21,7 @@ import org.apache.fluss.client.admin.Admin; import org.apache.fluss.client.metadata.KvSnapshotMetadata; import org.apache.fluss.client.table.scanner.batch.BatchScanner; +import org.apache.fluss.client.table.scanner.batch.KvBatchScanner; import org.apache.fluss.client.table.scanner.batch.KvSnapshotBatchScanner; import org.apache.fluss.client.table.scanner.batch.LimitBatchScanner; import org.apache.fluss.client.table.scanner.log.LogScanner; @@ -123,10 +124,19 @@ public TypedLogScanner createTypedLogScanner(Class pojoClass) { @Override public BatchScanner createBatchScanner(TableBucket tableBucket) { + if (tableInfo.hasPrimaryKey()) { + return new KvBatchScanner( + tableInfo, + tableBucket, + schemaGetter, + conn.getMetadataUpdater(), + projectedColumns, + limit == null ? null : (long) limit); + } if (limit == null) { throw new UnsupportedOperationException( String.format( - "Currently, BatchScanner is only available when limit is set. Table: %s, bucket: %s", + "Currently, for log table, BatchScanner is only available when limit is set. Table: %s, bucket: %s", tableInfo.getTablePath(), tableBucket)); } return new LimitBatchScanner( diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableSnapshotQuery.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableSnapshotQuery.java new file mode 100644 index 0000000000..eb4c7ac022 --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableSnapshotQuery.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner; + +import org.apache.fluss.client.FlussConnection; +import org.apache.fluss.client.table.scanner.batch.BatchScanner; +import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.metadata.PartitionInfo; +import org.apache.fluss.metadata.SchemaGetter; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.utils.CloseableIterator; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** Implementation of {@link SnapshotQuery}. */ +public class TableSnapshotQuery implements SnapshotQuery { + + private final FlussConnection conn; + private final TableInfo tableInfo; + private final SchemaGetter schemaGetter; + + /** The projected fields to do projection. No projection if is null. */ + @Nullable private final int[] projectedColumns; + + public TableSnapshotQuery( + FlussConnection conn, TableInfo tableInfo, SchemaGetter schemaGetter) { + this(conn, tableInfo, schemaGetter, null); + } + + private TableSnapshotQuery( + FlussConnection conn, + TableInfo tableInfo, + SchemaGetter schemaGetter, + @Nullable int[] projectedColumns) { + this.conn = conn; + this.tableInfo = tableInfo; + this.schemaGetter = schemaGetter; + this.projectedColumns = projectedColumns; + } + + @Override + public CloseableIterator execute(TableBucket tableBucket) { + Scan scan = new TableScan(conn, tableInfo, schemaGetter); + if (projectedColumns != null) { + scan = scan.project(projectedColumns); + } + BatchScanner batchScanner = scan.createBatchScanner(tableBucket); + return new BatchScannerIterator(batchScanner); + } + + @Override + public CloseableIterator execute() { + List buckets = new ArrayList<>(); + try { + if (tableInfo.isPartitioned()) { + List partitions = + conn.getAdmin().listPartitionInfos(tableInfo.getTablePath()).get(); + for (PartitionInfo partition : partitions) { + for (int i = 0; i < tableInfo.getNumBuckets(); i++) { + buckets.add( + new TableBucket( + tableInfo.getTableId(), partition.getPartitionId(), i)); + } + } + } else { + for (int i = 0; i < tableInfo.getNumBuckets(); i++) { + buckets.add(new TableBucket(tableInfo.getTableId(), i)); + } + } + } catch (Exception e) { + throw new FlussRuntimeException( + "Failed to list partitions for table " + tableInfo.getTablePath(), e); + } + + return new MultiBucketBatchScannerIterator(buckets); + } + + private class MultiBucketBatchScannerIterator implements CloseableIterator { + private final Iterator bucketIterator; + private CloseableIterator currentScannerIterator; + private boolean isClosed = false; + + private MultiBucketBatchScannerIterator(List buckets) { + this.bucketIterator = buckets.iterator(); + } + + @Override + public boolean hasNext() { + if (isClosed) { + return false; + } + while (currentScannerIterator == null || !currentScannerIterator.hasNext()) { + if (currentScannerIterator != null) { + currentScannerIterator.close(); + currentScannerIterator = null; + } + if (bucketIterator.hasNext()) { + currentScannerIterator = execute(bucketIterator.next()); + } else { + return false; + } + } + return true; + } + + @Override + public InternalRow next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return currentScannerIterator.next(); + } + + @Override + public void close() { + if (!isClosed) { + if (currentScannerIterator != null) { + currentScannerIterator.close(); + } + isClosed = true; + } + } + } + + private static class BatchScannerIterator implements CloseableIterator { + private final BatchScanner scanner; + private Iterator currentBatch; + private boolean isClosed = false; + + private BatchScannerIterator(BatchScanner scanner) { + this.scanner = scanner; + } + + @Override + public boolean hasNext() { + ensureBatch(); + return currentBatch != null && currentBatch.hasNext(); + } + + @Override + public InternalRow next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return currentBatch.next(); + } + + private void ensureBatch() { + try { + while ((currentBatch == null || !currentBatch.hasNext()) && !isClosed) { + CloseableIterator it = + scanner.pollBatch(Duration.ofMinutes(1)); // Use a large timeout + if (it == null) { + isClosed = true; + break; + } + if (it.hasNext()) { + currentBatch = it; + } else { + it.close(); + } + } + } catch (IOException e) { + throw new RuntimeException("Error polling batch from scanner", e); + } + } + + @Override + public void close() { + if (!isClosed) { + try { + scanner.close(); + } catch (IOException e) { + throw new RuntimeException("Error closing scanner", e); + } + isClosed = true; + } + } + } +} diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/KvBatchScanner.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/KvBatchScanner.java new file mode 100644 index 0000000000..b84419576f --- /dev/null +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/KvBatchScanner.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner.batch; + +import org.apache.fluss.client.metadata.MetadataUpdater; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.exception.LeaderNotAvailableException; +import org.apache.fluss.metadata.KvFormat; +import org.apache.fluss.metadata.SchemaGetter; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.record.DefaultValueRecordBatch; +import org.apache.fluss.record.ValueRecord; +import org.apache.fluss.record.ValueRecordReadContext; +import org.apache.fluss.row.GenericRow; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.row.ProjectedRow; +import org.apache.fluss.rpc.gateway.TabletServerGateway; +import org.apache.fluss.rpc.messages.PbScanReqForBucket; +import org.apache.fluss.rpc.messages.ScanKvRequest; +import org.apache.fluss.rpc.messages.ScanKvResponse; +import org.apache.fluss.rpc.messages.ScannerKeepAliveRequest; +import org.apache.fluss.rpc.protocol.Errors; +import org.apache.fluss.types.RowType; +import org.apache.fluss.utils.CloseableIterator; +import org.apache.fluss.utils.SchemaUtil; +import org.apache.fluss.utils.concurrent.ExecutorThreadFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** A {@link BatchScanner} implementation that scans records from a primary key table. */ +public class KvBatchScanner implements BatchScanner { + private static final Logger LOG = LoggerFactory.getLogger(KvBatchScanner.class); + + private final TableInfo tableInfo; + private final TableBucket tableBucket; + private final SchemaGetter schemaGetter; + private final MetadataUpdater metadataUpdater; + @Nullable private final int[] projectedFields; + @Nullable private final Long limit; + private final int targetSchemaId; + private final InternalRow.FieldGetter[] fieldGetters; + private final KvFormat kvFormat; + private final int batchSizeBytes; + + private final Map schemaProjectionCache = new HashMap<>(); + + private byte[] scannerId; + private int callSeqId = 0; + private boolean hasMoreResults = true; + private boolean isClosed = false; + + private CompletableFuture inFlightRequest; + private ScheduledExecutorService keepAliveExecutor; + + public KvBatchScanner( + TableInfo tableInfo, + TableBucket tableBucket, + SchemaGetter schemaGetter, + MetadataUpdater metadataUpdater, + @Nullable int[] projectedFields, + @Nullable Long limit) { + this.tableInfo = tableInfo; + this.tableBucket = tableBucket; + this.schemaGetter = schemaGetter; + this.metadataUpdater = metadataUpdater; + this.projectedFields = projectedFields; + this.limit = limit; + this.targetSchemaId = tableInfo.getSchemaId(); + this.kvFormat = tableInfo.getTableConfig().getKvFormat(); + this.batchSizeBytes = + (int) + tableInfo + .getTableConfig() + .getConfiguration() + .get(ConfigOptions.CLIENT_SCANNER_KV_FETCH_MAX_BYTES) + .getBytes(); + + RowType rowType = tableInfo.getRowType(); + this.fieldGetters = new InternalRow.FieldGetter[rowType.getFieldCount()]; + for (int i = 0; i < rowType.getFieldCount(); i++) { + this.fieldGetters[i] = InternalRow.createFieldGetter(rowType.getTypeAt(i), i); + } + } + + @Nullable + @Override + public CloseableIterator pollBatch(Duration timeout) throws IOException { + if (isClosed || (!hasMoreResults && inFlightRequest == null)) { + return null; + } + + try { + if (inFlightRequest == null) { + sendRequest(); + } + + ScanKvResponse response = + inFlightRequest.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + inFlightRequest = null; + + if (response.hasErrorCode() && response.getErrorCode() != Errors.NONE.code()) { + throw Errors.forCode(response.getErrorCode()).exception(response.getErrorMessage()); + } + + this.scannerId = response.getScannerId(); + this.hasMoreResults = response.isHasMoreResults(); + this.callSeqId++; + + List rows = parseScanKvResponse(response); + + // pipeline: send next request if there are more results + if (hasMoreResults) { + sendRequest(); + } + + return CloseableIterator.wrap(rows.iterator()); + } catch (java.util.concurrent.TimeoutException e) { + return CloseableIterator.emptyIterator(); + } catch (Exception e) { + throw new IOException(e); + } + } + + private void sendRequest() { + int leader = metadataUpdater.leaderFor(tableInfo.getTablePath(), tableBucket); + TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(leader); + if (gateway == null) { + throw new LeaderNotAvailableException( + "Server " + leader + " is not found in metadata cache."); + } + + ScanKvRequest request = new ScanKvRequest(); + request.setBatchSizeBytes(batchSizeBytes); + if (scannerId == null) { + PbScanReqForBucket bucketScanReq = request.setBucketScanReq(); + bucketScanReq.setTableId(tableBucket.getTableId()).setBucketId(tableBucket.getBucket()); + if (tableBucket.getPartitionId() != null) { + bucketScanReq.setPartitionId(tableBucket.getPartitionId()); + } + if (limit != null) { + bucketScanReq.setLimit(limit); + } + request.setCallSeqId(0); + } else { + request.setScannerId(scannerId).setCallSeqId(callSeqId); + } + + inFlightRequest = gateway.scanKv(request); + } + + private List parseScanKvResponse(ScanKvResponse response) { + if (!response.hasRecords()) { + return Collections.emptyList(); + } + + List scanRows = new ArrayList<>(); + ByteBuffer recordsBuffer = ByteBuffer.wrap(response.getRecords()); + DefaultValueRecordBatch valueRecords = + DefaultValueRecordBatch.pointToByteBuffer(recordsBuffer); + ValueRecordReadContext readContext = + ValueRecordReadContext.createReadContext(schemaGetter, kvFormat); + + for (ValueRecord record : valueRecords.records(readContext)) { + InternalRow row = record.getRow(); + if (targetSchemaId != record.schemaId()) { + int[] indexMapping = + schemaProjectionCache.computeIfAbsent( + record.schemaId(), + sourceSchemaId -> + SchemaUtil.getIndexMapping( + schemaGetter.getSchema(sourceSchemaId), + schemaGetter.getSchema(targetSchemaId))); + row = ProjectedRow.from(indexMapping).replaceRow(row); + } + scanRows.add(maybeProject(row)); + } + return scanRows; + } + + private InternalRow maybeProject(InternalRow originRow) { + GenericRow newRow = new GenericRow(fieldGetters.length); + for (int i = 0; i < fieldGetters.length; i++) { + newRow.setField(i, fieldGetters[i].getFieldOrNull(originRow)); + } + if (projectedFields != null) { + ProjectedRow projectedRow = ProjectedRow.from(projectedFields); + projectedRow.replaceRow(newRow); + return projectedRow; + } else { + return newRow; + } + } + + public void startKeepAlivePeriodically(int keepAliveIntervalMs) { + if (keepAliveExecutor != null) { + return; + } + + keepAliveExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("kv-scanner-keep-alive-" + tableBucket)); + keepAliveExecutor.scheduleAtFixedRate( + this::sendKeepAlive, + keepAliveIntervalMs, + keepAliveIntervalMs, + TimeUnit.MILLISECONDS); + } + + private void sendKeepAlive() { + if (isClosed || scannerId == null || !hasMoreResults) { + return; + } + + try { + int leader = metadataUpdater.leaderFor(tableInfo.getTablePath(), tableBucket); + TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(leader); + if (gateway != null) { + gateway.scannerKeepAlive(new ScannerKeepAliveRequest().setScannerId(scannerId)); + } + } catch (Exception e) { + LOG.warn("Failed to send keep alive for scanner {}", tableBucket, e); + } + } + + @Override + public void close() throws IOException { + if (isClosed) { + return; + } + isClosed = true; + + if (keepAliveExecutor != null) { + keepAliveExecutor.shutdownNow(); + } + + if (scannerId != null && hasMoreResults) { + // Close scanner on server + int leader = metadataUpdater.leaderFor(tableInfo.getTablePath(), tableBucket); + TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(leader); + if (gateway != null) { + gateway.scanKv( + new ScanKvRequest() + .setScannerId(scannerId) + .setCallSeqId(callSeqId) + .setCloseScanner(true)); + } + } + + if (inFlightRequest != null) { + inFlightRequest.cancel(true); + } + } +} diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotQueryITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotQueryITCase.java new file mode 100644 index 0000000000..3754ff3880 --- /dev/null +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotQueryITCase.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.client.table.scanner.batch; + +import org.apache.fluss.client.admin.ClientToServerITCaseBase; +import org.apache.fluss.client.table.Table; +import org.apache.fluss.client.table.writer.UpsertWriter; +import org.apache.fluss.metadata.PartitionSpec; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.row.InternalRow; +import org.apache.fluss.types.DataTypes; +import org.apache.fluss.utils.CloseableIterator; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +import static org.apache.fluss.testutils.DataTestUtils.row; +import static org.apache.fluss.testutils.InternalRowAssert.assertThatRow; +import static org.assertj.core.api.Assertions.assertThat; + +/** ITCase for snapshot query. */ +public class KvSnapshotQueryITCase extends ClientToServerITCaseBase { + @BeforeEach + protected void setup() throws Exception { + super.setup(); + } + + @AfterEach + protected void teardown() throws Exception { + super.teardown(); + } + + @Test + void testBasicScan() throws Exception { + TablePath tablePath = TablePath.of("test_db", "test_basic_scan"); + Schema schema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .primaryKey("id") + .build(); + TableDescriptor descriptor = + TableDescriptor.builder().schema(schema).distributedBy(1, "id").build(); + + createTable(tablePath, descriptor, true); + + Table table = conn.getTable(tablePath); + + // 1. write data + UpsertWriter writer = table.newUpsert().createWriter(); + writer.upsert(row(1, "a")); + writer.upsert(row(2, "b")); + writer.upsert(row(3, "c")); + writer.flush(); + + // 2. test the snapshotQuery works as expected + List result = snapshotQueryAll(table); + + assertThat(result).hasSize(3); + result.sort(Comparator.comparingInt(r -> r.getInt(0))); + assertThatRow(result.get(0)).withSchema(schema.getRowType()).isEqualTo(row(1, "a")); + assertThatRow(result.get(1)).withSchema(schema.getRowType()).isEqualTo(row(2, "b")); + assertThatRow(result.get(2)).withSchema(schema.getRowType()).isEqualTo(row(3, "c")); + } + + @Test + void testMultiBucketScan() throws Exception { + TablePath tablePath = TablePath.of("test_db", "test_multi_bucket_scan"); + Schema schema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .primaryKey("id") + .build(); + // 3 buckets + TableDescriptor descriptor = + TableDescriptor.builder().schema(schema).distributedBy(3, "id").build(); + + createTable(tablePath, descriptor, true); + Table table = conn.getTable(tablePath); + + // 1. write data to multiple buckets + int rowCount = 100; + UpsertWriter writer = table.newUpsert().createWriter(); + for (int i = 0; i < rowCount; i++) { + writer.upsert(row(i, "val" + i)); + } + writer.flush(); + + // 2. scan all buckets and collect all data + List allResult = snapshotQueryAll(table); + + assertThat(allResult).hasSize(rowCount); + allResult.sort(Comparator.comparingInt(r -> r.getInt(0))); + for (int i = 0; i < rowCount; i++) { + assertThatRow(allResult.get(i)) + .withSchema(schema.getRowType()) + .isEqualTo(row(i, "val" + i)); + } + } + + @Test + void testPartitionedTableScan() throws Exception { + TablePath tablePath = TablePath.of("test_db", "test_partitioned_scan"); + Schema schema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("p", DataTypes.STRING()) + .column("name", DataTypes.STRING()) + .primaryKey("id", "p") + .build(); + TableDescriptor descriptor = + TableDescriptor.builder() + .schema(schema) + .partitionedBy("p") + .distributedBy(1, "id") + .build(); + + createTable(tablePath, descriptor, true); + admin.createPartition( + tablePath, + new PartitionSpec(java.util.Collections.singletonMap("p", "p1")), + false) + .get(); + admin.createPartition( + tablePath, + new PartitionSpec(java.util.Collections.singletonMap("p", "p2")), + false) + .get(); + + Table table = conn.getTable(tablePath); + + UpsertWriter writer = table.newUpsert().createWriter(); + writer.upsert(row(1, "p1", "a1")); + writer.upsert(row(2, "p1", "b1")); + writer.upsert(row(1, "p2", "a2")); + writer.flush(); + + List result = snapshotQueryAll(table); + assertThat(result).hasSize(3); + } + + @Test + void testLargeDataScan() throws Exception { + TablePath tablePath = TablePath.of("test_db", "test_large_data_scan"); + Schema schema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .primaryKey("id") + .build(); + TableDescriptor descriptor = + TableDescriptor.builder().schema(schema).distributedBy(1, "id").build(); + + createTable(tablePath, descriptor, true); + Table table = conn.getTable(tablePath); + + // 1. write 10k records + int rowCount = 10000; + UpsertWriter writer = table.newUpsert().createWriter(); + for (int i = 0; i < rowCount; i++) { + writer.upsert(row(i, "val" + i)); + } + writer.flush(); + + // 2. scan and verify + List result = snapshotQueryAll(table); + + assertThat(result).hasSize(rowCount); + result.sort(Comparator.comparingInt(r -> r.getInt(0))); + for (int i = 0; i < rowCount; i++) { + assertThatRow(result.get(i)) + .withSchema(schema.getRowType()) + .isEqualTo(row(i, "val" + i)); + } + } + + @Test + void testSnapshotQuery() throws Exception { + TablePath tablePath = TablePath.of("test_db", "test_snapshot_query"); + Schema schema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .primaryKey("id") + .build(); + TableDescriptor descriptor = + TableDescriptor.builder().schema(schema).distributedBy(1, "id").build(); + + createTable(tablePath, descriptor, true); + Table table = conn.getTable(tablePath); + + // 1. write data + UpsertWriter writer = table.newUpsert().createWriter(); + writer.upsert(row(1, "a")); + writer.upsert(row(2, "b")); + writer.upsert(row(3, "c")); + writer.flush(); + + // 2. test the snapshotQuery works as expected + List result = snapshotQueryAll(table); + + assertThat(result).hasSize(3); + result.sort(Comparator.comparingInt(r -> r.getInt(0))); + assertThatRow(result.get(0)).withSchema(schema.getRowType()).isEqualTo(row(1, "a")); + assertThatRow(result.get(1)).withSchema(schema.getRowType()).isEqualTo(row(2, "b")); + assertThatRow(result.get(2)).withSchema(schema.getRowType()).isEqualTo(row(3, "c")); + } + + private List snapshotQueryAll(Table table) throws Exception { + List allRows = new ArrayList<>(); + try (CloseableIterator iterator = table.newSnapshotQuery().execute()) { + while (iterator.hasNext()) { + allRows.add(iterator.next()); + } + } + return allRows; + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 7e04c3a1cc..b429856d56 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -439,6 +439,14 @@ public class ConfigOptions { + WRITER_ID_EXPIRATION_TIME.key() + " passing. The default value is 10 minutes."); + public static final ConfigOption SERVER_SCANNER_TTL = + key("server.scanner.ttl") + .durationType() + .defaultValue(Duration.ofMinutes(10)) + .withDescription( + "The time that the tablet server will wait without receiving any scan request from " + + "a client before expiring the related status. The default value is 10 minutes."); + public static final ConfigOption TABLET_SERVER_CONTROLLED_SHUTDOWN_MAX_RETRIES = key("tablet-server.controlled-shutdown.max-retries") .intType() @@ -1093,6 +1101,14 @@ public class ConfigOptions { + "will still be returned to ensure that the fetch can make progress. As such, " + "this is not a absolute maximum."); + public static final ConfigOption CLIENT_SCANNER_KV_FETCH_MAX_BYTES = + key("client.scanner.kv.fetch.max-bytes") + .memoryType() + .defaultValue(MemorySize.parse("1mb")) + .withDescription( + "The maximum amount of data the server should return for a kv scan request. " + + "The default value is 1mb."); + public static final ConfigOption CLIENT_SCANNER_LOG_FETCH_MAX_BYTES_FOR_BUCKET = key("client.scanner.log.fetch.max-bytes-for-bucket") .memoryType() diff --git a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java index 984a1def4a..b972a80ff8 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java @@ -50,6 +50,11 @@ public TableConfig(Configuration config) { this.config = config; } + /** Gets the table properties configuration. */ + public Configuration getConfiguration() { + return config; + } + /** Gets the replication factor of the table. */ public int getReplicationFactor() { return config.get(ConfigOptions.TABLE_REPLICATION_FACTOR); diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/ScannerNotFoundException.java b/fluss-common/src/main/java/org/apache/fluss/exception/ScannerNotFoundException.java new file mode 100644 index 0000000000..e146cb3b09 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/exception/ScannerNotFoundException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.exception; + +/** Exception thrown when a scanner is not found. */ +public class ScannerNotFoundException extends ApiException { + private static final long serialVersionUID = 1L; + + public ScannerNotFoundException(String message) { + super(message); + } +} diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/TabletServerGateway.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/TabletServerGateway.java index 578b74e5e2..436637edd0 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/TabletServerGateway.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/TabletServerGateway.java @@ -42,6 +42,10 @@ import org.apache.fluss.rpc.messages.ProduceLogResponse; import org.apache.fluss.rpc.messages.PutKvRequest; import org.apache.fluss.rpc.messages.PutKvResponse; +import org.apache.fluss.rpc.messages.ScanKvRequest; +import org.apache.fluss.rpc.messages.ScanKvResponse; +import org.apache.fluss.rpc.messages.ScannerKeepAliveRequest; +import org.apache.fluss.rpc.messages.ScannerKeepAliveResponse; import org.apache.fluss.rpc.messages.StopReplicaRequest; import org.apache.fluss.rpc.messages.StopReplicaResponse; import org.apache.fluss.rpc.messages.UpdateMetadataRequest; @@ -130,6 +134,24 @@ CompletableFuture notifyLeaderAndIsr( @RPC(api = ApiKeys.LIMIT_SCAN) CompletableFuture limitScan(LimitScanRequest request); + /** + * Scan kv data from the specified table bucket. + * + * @param request the scan kv request + * @return the scan kv response + */ + @RPC(api = ApiKeys.SCAN_KV) + CompletableFuture scanKv(ScanKvRequest request); + + /** + * Keep alive for the specified scanner. + * + * @param request the scanner keep alive request + * @return the scanner keep alive response + */ + @RPC(api = ApiKeys.SCANNER_KEEP_ALIVE) + CompletableFuture scannerKeepAlive(ScannerKeepAliveRequest request); + /** * List offsets for the specified table bucket. * diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java index cc033ba8a9..b72edebf75 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java @@ -80,7 +80,9 @@ public enum ApiKeys { REBALANCE(1049, 0, 0, PUBLIC), LIST_REBALANCE_PROGRESS(1050, 0, 0, PUBLIC), CANCEL_REBALANCE(1051, 0, 0, PUBLIC), - PREPARE_LAKE_TABLE_SNAPSHOT(1052, 0, 0, PRIVATE); + PREPARE_LAKE_TABLE_SNAPSHOT(1052, 0, 0, PRIVATE), + SCAN_KV(1053, 0, 0, PUBLIC), + SCANNER_KEEP_ALIVE(1054, 0, 0, PUBLIC); private static final Map ID_TO_TYPE = Arrays.stream(ApiKeys.values()) diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java index 5ee652cce2..a6aa827cb6 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java @@ -64,6 +64,7 @@ import org.apache.fluss.exception.RebalanceFailureException; import org.apache.fluss.exception.RecordTooLargeException; import org.apache.fluss.exception.RetriableAuthenticationException; +import org.apache.fluss.exception.ScannerNotFoundException; import org.apache.fluss.exception.SchemaNotExistException; import org.apache.fluss.exception.SecurityDisabledException; import org.apache.fluss.exception.SecurityTokenException; @@ -240,7 +241,8 @@ public enum Errors { SEVER_TAG_NOT_EXIST_EXCEPTION(60, "The server tag not exist.", ServerTagNotExistException::new), REBALANCE_FAILURE_EXCEPTION(61, "The rebalance task failure.", RebalanceFailureException::new), NO_REBALANCE_IN_PROGRESS_EXCEPTION( - 62, "No rebalance task in progress.", NoRebalanceInProgressException::new); + 62, "No rebalance task in progress.", NoRebalanceInProgressException::new), + SCANNER_NOT_FOUND_EXCEPTION(63, "The scanner is not found.", ScannerNotFoundException::new); private static final Logger LOG = LoggerFactory.getLogger(Errors.class); diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index db9d614354..d828cb6105 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -255,6 +255,84 @@ message PrefixLookupResponse { } +// scan kv request and response +message ScanKvRequest { + // If continuing an existing scan, then you must set scanner_id. + // Otherwise, you must set 'new_scan_request'. + optional bytes scanner_id = 1; + optional PbScanReqForBucket bucket_scan_req = 2; + + // The sequence ID of this call. The sequence ID should start at 0 + // with the request for a new scanner, and after each successful request, + // the client should increment it by 1. When retrying a request, the client + // should _not_ increment this value. If the server detects that the client + // missed a chunk of rows from the middle of a scan, it will respond with an + // error. + optional uint32 call_seq_id = 3; + + // The maximum number of bytes to send in the response. + optional uint32 batch_size_bytes = 4; + + // If set, the server will close the scanner after responding to + // this request, regardless of whether all rows have been delivered. + optional bool close_scanner = 5; +} + +message PbScanReqForBucket { + // The tablet to scan. + required int64 table_id = 1; + optional int64 partition_id = 2; + required int32 bucket_id = 3; + + // The maximum number of rows to scan with the new scanner. + // + // The scanner will automatically stop yielding results and close itself + // after reaching this number of result rows. + optional uint64 limit = 4; +} + +message ScanKvResponse { + // The error, if an error occurred with this request. + optional int32 error_code = 1; + optional string error_message = 2; + + // When a scanner is created, returns the scanner ID which may be used + // to pull new rows from the scanner. + optional bytes scanner_id = 3; + + // Set to true to indicate that there may be further results to be fetched + // from this scanner. If the scanner has no more results, then the scanner + // ID will become invalid and cannot continue to be used. + // + // Note that if a scan returns no results, then the initial response from + // the first RPC may return false in this flag, in which case there will + // be no scanner ID assigned. + optional bool has_more_results = 4; + + // The block of returned rows. + // + // NOTE: the schema-related fields will not be present in this row block. + // The schema will match the schema requested by the client when it created + // the scanner. + optional bytes records = 5; + + // Returns the corresponding log offset at the time the scanner is created + optional int64 log_offset = 6; +} + +// A scanner keep-alive request. +// Updates the scanner access time, increasing its time-to-live. +message ScannerKeepAliveRequest { + required bytes scanner_id = 1; +} + +message ScannerKeepAliveResponse { + // The error, if an error occurred with this request. + optional int32 error_code = 1; + optional string error_message = 2; +} + + // limit scan request and response message LimitScanRequest { required int64 table_id = 2; diff --git a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java index 7db3654383..7503c3379a 100644 --- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java +++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java @@ -71,6 +71,10 @@ import org.apache.fluss.rpc.messages.ProduceLogResponse; import org.apache.fluss.rpc.messages.PutKvRequest; import org.apache.fluss.rpc.messages.PutKvResponse; +import org.apache.fluss.rpc.messages.ScanKvRequest; +import org.apache.fluss.rpc.messages.ScanKvResponse; +import org.apache.fluss.rpc.messages.ScannerKeepAliveRequest; +import org.apache.fluss.rpc.messages.ScannerKeepAliveResponse; import org.apache.fluss.rpc.messages.StopReplicaRequest; import org.apache.fluss.rpc.messages.StopReplicaResponse; import org.apache.fluss.rpc.messages.TableExistsRequest; @@ -135,6 +139,17 @@ public CompletableFuture limitScan(LimitScanRequest request) return null; } + @Override + public CompletableFuture scanKv(ScanKvRequest request) { + return null; + } + + @Override + public CompletableFuture scannerKeepAlive( + ScannerKeepAliveRequest request) { + return null; + } + @Override public CompletableFuture listOffsets(ListOffsetsRequest request) { return null; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java index f3998f4435..d7ab654151 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rocksdb/RocksDBKv.java @@ -32,6 +32,7 @@ import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; +import org.rocksdb.Snapshot; import org.rocksdb.Statistics; import org.rocksdb.WriteOptions; @@ -151,6 +152,18 @@ public List limitScan(Integer limit) { return pkList; } + public Snapshot getSnapshot() { + return db.getSnapshot(); + } + + public void releaseSnapshot(Snapshot snapshot) { + db.releaseSnapshot(snapshot); + } + + public RocksIterator newIterator(ReadOptions readOptions) { + return db.newIterator(defaultColumnFamilyHandle, readOptions); + } + public void put(byte[] key, byte[] value) throws IOException { try { db.put(writeOptions, key, value); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerContext.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerContext.java new file mode 100644 index 0000000000..e74097d1f6 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerContext.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.scan; + +import org.apache.fluss.server.kv.rocksdb.RocksDBKv; +import org.apache.fluss.utils.clock.Clock; + +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksIterator; +import org.rocksdb.Snapshot; + +import javax.annotation.concurrent.NotThreadSafe; + +/** The context for a scanner. */ +@NotThreadSafe +public class ScannerContext implements AutoCloseable { + private final RocksDBKv rocksDBKv; + private final RocksIterator iterator; + private final ReadOptions readOptions; + private final Snapshot snapshot; + private final long limit; + + private int callSeqId; + private long lastAccessTime; + private long rowsScanned; + + public ScannerContext( + RocksDBKv rocksDBKv, + RocksIterator iterator, + ReadOptions readOptions, + Snapshot snapshot, + long limit, + Clock clock) { + this.rocksDBKv = rocksDBKv; + this.iterator = iterator; + this.readOptions = readOptions; + this.snapshot = snapshot; + this.limit = limit; + this.callSeqId = 0; + this.lastAccessTime = clock.milliseconds(); + this.rowsScanned = 0; + } + + public RocksIterator getIterator() { + return iterator; + } + + public Snapshot getSnapshot() { + return snapshot; + } + + public long getLimit() { + return limit; + } + + public int getCallSeqId() { + return callSeqId; + } + + public void setCallSeqId(int callSeqId) { + this.callSeqId = callSeqId; + } + + public long getLastAccessTime() { + return lastAccessTime; + } + + public void updateLastAccessTime(long lastAccessTime) { + this.lastAccessTime = lastAccessTime; + } + + public long getRowsScanned() { + return rowsScanned; + } + + public void incrementRowsScanned(long count) { + this.rowsScanned += count; + } + + @Override + public void close() { + iterator.close(); + readOptions.close(); + rocksDBKv.releaseSnapshot(snapshot); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerManager.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerManager.java new file mode 100644 index 0000000000..4beac6e1d9 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerManager.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.scan; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.rpc.protocol.Errors; +import org.apache.fluss.server.kv.KvTablet; +import org.apache.fluss.server.kv.rocksdb.RocksDBKv; +import org.apache.fluss.utils.AutoCloseableAsync; +import org.apache.fluss.utils.MapUtils; +import org.apache.fluss.utils.clock.Clock; +import org.apache.fluss.utils.clock.SystemClock; +import org.apache.fluss.utils.concurrent.ExecutorThreadFactory; +import org.apache.fluss.utils.concurrent.FutureUtils; + +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksIterator; +import org.rocksdb.Snapshot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** A manager for scanners. */ +public class ScannerManager implements AutoCloseableAsync { + private static final Logger LOG = LoggerFactory.getLogger(ScannerManager.class); + + private final Map scanners = MapUtils.newConcurrentHashMap(); + private final ScheduledExecutorService cleanupExecutor; + private final Clock clock; + private final long scannerTtlMs; + + public ScannerManager(Configuration conf) { + this(conf, SystemClock.getInstance()); + } + + public ScannerManager(Configuration conf, Clock clock) { + this.clock = clock; + this.scannerTtlMs = conf.get(ConfigOptions.SERVER_SCANNER_TTL).toMillis(); + this.cleanupExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("scanner-cleanup-thread")); + this.cleanupExecutor.scheduleWithFixedDelay( + this::cleanupExpiredScanners, + scannerTtlMs / 2, + scannerTtlMs / 2, + TimeUnit.MILLISECONDS); + } + + public byte[] createScanner(KvTablet kvTablet, long limit) { + RocksDBKv rocksDBKv = kvTablet.getRocksDBKv(); + Snapshot snapshot = rocksDBKv.getSnapshot(); + ReadOptions readOptions = new ReadOptions().setSnapshot(snapshot); + RocksIterator iterator = rocksDBKv.newIterator(readOptions); + iterator.seekToFirst(); + + ScannerContext context = + new ScannerContext(rocksDBKv, iterator, readOptions, snapshot, limit, clock); + byte[] scannerId = generateScannerId(); + scanners.put(ByteBuffer.wrap(scannerId), context); + return scannerId; + } + + public ScannerContext getScanner(byte[] scannerId) { + ScannerContext context = scanners.get(ByteBuffer.wrap(scannerId)); + if (context != null) { + context.updateLastAccessTime(clock.milliseconds()); + } + return context; + } + + public void keepAlive(byte[] scannerId) { + ScannerContext context = scanners.get(ByteBuffer.wrap(scannerId)); + if (context != null) { + context.updateLastAccessTime(clock.milliseconds()); + } else { + throw Errors.SCANNER_NOT_FOUND_EXCEPTION.exception( + "Scanner not found: " + scannerIdToString(scannerId)); + } + } + + public void removeScanner(byte[] scannerId) { + ScannerContext context = scanners.remove(ByteBuffer.wrap(scannerId)); + if (context != null) { + closeScannerContext(context); + } + } + + private void cleanupExpiredScanners() { + long now = clock.milliseconds(); + for (Map.Entry entry : scanners.entrySet()) { + if (now - entry.getValue().getLastAccessTime() > scannerTtlMs) { + ScannerContext context = scanners.remove(entry.getKey()); + if (context != null) { + LOG.info( + "Scanner {} expired, closing it.", + scannerIdToString(entry.getKey().array())); + closeScannerContext(context); + } + } + } + } + + private void closeScannerContext(ScannerContext context) { + try { + context.close(); + } catch (Exception e) { + LOG.error("Fail to close scanner context.", e); + } + } + + private byte[] generateScannerId() { + return UUID.randomUUID().toString().replace("-", "").getBytes(StandardCharsets.UTF_8); + } + + private String scannerIdToString(byte[] scannerId) { + return new String(scannerId, StandardCharsets.UTF_8); + } + + @Override + public CompletableFuture closeAsync() { + try { + close(); + return CompletableFuture.completedFuture(null); + } catch (Exception e) { + return FutureUtils.completedExceptionally(e); + } + } + + @Override + public void close() throws Exception { + cleanupExecutor.shutdownNow(); + for (ScannerContext context : scanners.values()) { + closeScannerContext(context); + } + scanners.clear(); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java index 8eed63c844..dc98e5b8ca 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java @@ -41,6 +41,7 @@ import org.apache.fluss.server.coordinator.LakeCatalogDynamicLoader; import org.apache.fluss.server.coordinator.MetadataManager; import org.apache.fluss.server.kv.KvManager; +import org.apache.fluss.server.kv.scan.ScannerManager; import org.apache.fluss.server.kv.snapshot.DefaultCompletedKvSnapshotCommitter; import org.apache.fluss.server.log.LogManager; import org.apache.fluss.server.log.remote.RemoteLogManager; @@ -125,6 +126,9 @@ public class TabletServer extends ServerBase { @GuardedBy("lock") private TabletService tabletService; + @GuardedBy("lock") + private ScannerManager scannerManager; + @GuardedBy("lock") private MetricRegistry metricRegistry; @@ -230,6 +234,8 @@ protected void startServices() throws Exception { this.kvManager = KvManager.create(conf, zkClient, logManager, tabletServerMetricGroup); kvManager.startup(); + this.scannerManager = new ScannerManager(conf, clock); + // Register kvManager to dynamicConfigManager for dynamic reconfiguration dynamicConfigManager.register(kvManager); // Start dynamicConfigManager after all reconfigurable components are registered @@ -286,6 +292,7 @@ protected void startServices() throws Exception { metadataManager, authorizer, dynamicConfigManager, + scannerManager, ioExecutor); RequestsMetrics requestsMetrics = diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java index 992b963334..66f6650d96 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java @@ -19,10 +19,14 @@ import org.apache.fluss.cluster.ServerType; import org.apache.fluss.exception.AuthorizationException; +import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.exception.NonPrimaryKeyTableException; +import org.apache.fluss.exception.NotLeaderOrFollowerException; import org.apache.fluss.exception.UnknownTableOrBucketException; import org.apache.fluss.fs.FileSystem; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.record.DefaultValueRecordBatch; import org.apache.fluss.record.KvRecordBatch; import org.apache.fluss.record.MemoryLogRecords; import org.apache.fluss.rpc.entity.FetchLogResultForBucket; @@ -50,12 +54,17 @@ import org.apache.fluss.rpc.messages.NotifyLeaderAndIsrResponse; import org.apache.fluss.rpc.messages.NotifyRemoteLogOffsetsRequest; import org.apache.fluss.rpc.messages.NotifyRemoteLogOffsetsResponse; +import org.apache.fluss.rpc.messages.PbScanReqForBucket; import org.apache.fluss.rpc.messages.PrefixLookupRequest; import org.apache.fluss.rpc.messages.PrefixLookupResponse; import org.apache.fluss.rpc.messages.ProduceLogRequest; import org.apache.fluss.rpc.messages.ProduceLogResponse; import org.apache.fluss.rpc.messages.PutKvRequest; import org.apache.fluss.rpc.messages.PutKvResponse; +import org.apache.fluss.rpc.messages.ScanKvRequest; +import org.apache.fluss.rpc.messages.ScanKvResponse; +import org.apache.fluss.rpc.messages.ScannerKeepAliveRequest; +import org.apache.fluss.rpc.messages.ScannerKeepAliveResponse; import org.apache.fluss.rpc.messages.StopReplicaRequest; import org.apache.fluss.rpc.messages.StopReplicaResponse; import org.apache.fluss.rpc.messages.UpdateMetadataRequest; @@ -71,16 +80,23 @@ import org.apache.fluss.server.entity.FetchReqInfo; import org.apache.fluss.server.entity.NotifyLeaderAndIsrData; import org.apache.fluss.server.entity.UserContext; +import org.apache.fluss.server.kv.scan.ScannerContext; +import org.apache.fluss.server.kv.scan.ScannerManager; import org.apache.fluss.server.log.FetchParams; import org.apache.fluss.server.log.ListOffsetsParam; import org.apache.fluss.server.metadata.TabletServerMetadataCache; import org.apache.fluss.server.metadata.TabletServerMetadataProvider; +import org.apache.fluss.server.replica.Replica; import org.apache.fluss.server.replica.ReplicaManager; import org.apache.fluss.server.utils.ServerRpcMessageUtils; import org.apache.fluss.server.zk.ZooKeeperClient; +import org.rocksdb.RocksIterator; + import javax.annotation.Nullable; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -127,6 +143,7 @@ public final class TabletService extends RpcServiceBase implements TabletServerG private final ReplicaManager replicaManager; private final TabletServerMetadataCache metadataCache; private final TabletServerMetadataProvider metadataFunctionProvider; + private final ScannerManager scannerManager; public TabletService( int serverId, @@ -137,6 +154,7 @@ public TabletService( MetadataManager metadataManager, @Nullable Authorizer authorizer, DynamicConfigManager dynamicConfigManager, + ScannerManager scannerManager, ExecutorService ioExecutor) { super( remoteFileSystem, @@ -149,6 +167,7 @@ public TabletService( this.serviceName = "server-" + serverId; this.replicaManager = replicaManager; this.metadataCache = metadataCache; + this.scannerManager = scannerManager; this.metadataFunctionProvider = new TabletServerMetadataProvider(zkClient, metadataManager, metadataCache); } @@ -285,6 +304,136 @@ public CompletableFuture limitScan(LimitScanRequest request) return response; } + @Override + public CompletableFuture scanKv(ScanKvRequest request) { + CompletableFuture response = new CompletableFuture<>(); + try { + if (request.hasScannerId()) { + byte[] scannerId = request.getScannerId(); + ScannerContext context = scannerManager.getScanner(scannerId); + if (context == null) { + throw Errors.SCANNER_NOT_FOUND_EXCEPTION.exception( + "Scanner not found: " + new String(scannerId, StandardCharsets.UTF_8)); + } + + if (request.hasCloseScanner() && request.isCloseScanner()) { + scannerManager.removeScanner(scannerId); + ScanKvResponse scanResponse = new ScanKvResponse(); + scanResponse.setScannerId(scannerId).setHasMoreResults(false); + response.complete(scanResponse); + return response; + } + + // check call seq id + if (request.getCallSeqId() != context.getCallSeqId() + 1) { + throw new FlussRuntimeException( + "Out of order scan request. Expected call seq id: " + + (context.getCallSeqId() + 1) + + ", but got: " + + request.getCallSeqId()); + } + context.setCallSeqId(request.getCallSeqId()); + + response.complete(continueScan(scannerId, context, request.getBatchSizeBytes())); + } else { + PbScanReqForBucket bucketScanReq = request.getBucketScanReq(); + authorizeTable(READ, bucketScanReq.getTableId()); + + TableBucket tb = + new TableBucket( + bucketScanReq.getTableId(), + bucketScanReq.hasPartitionId() + ? bucketScanReq.getPartitionId() + : null, + bucketScanReq.getBucketId()); + Replica replica = replicaManager.getReplicaOrException(tb); + if (!replica.isLeader()) { + throw new NotLeaderOrFollowerException("Leader not local for bucket " + tb); + } + if (!replica.isKvTable()) { + throw new NonPrimaryKeyTableException( + "Table " + bucketScanReq.getTableId() + " is not a primary key table."); + } + + long limit = bucketScanReq.hasLimit() ? bucketScanReq.getLimit() : Long.MAX_VALUE; + byte[] scannerId = scannerManager.createScanner(replica.getKvTablet(), limit); + ScannerContext context = scannerManager.getScanner(scannerId); + + ScanKvResponse scanResponse = + continueScan(scannerId, context, request.getBatchSizeBytes()); + // The FIP says: "Returns the corresponding log offset at the time the scanner is + // created" + // We can use the high watermark or the current log end offset. + scanResponse.setLogOffset(replica.getLogHighWatermark()); + response.complete(scanResponse); + } + } catch (Exception e) { + response.complete(makeScanKvErrorResponse(e)); + } + return response; + } + + private ScanKvResponse continueScan( + byte[] scannerId, ScannerContext context, int batchSizeBytes) throws IOException { + RocksIterator iterator = context.getIterator(); + DefaultValueRecordBatch.Builder builder = DefaultValueRecordBatch.builder(); + int currentBytes = 0; + long rowsScannedInThisBatch = 0; + + while (iterator.isValid() + && context.getRowsScanned() + rowsScannedInThisBatch < context.getLimit()) { + byte[] value = iterator.value(); + // Check if adding this record would exceed batch size. + // But we must add at least one record. + if (rowsScannedInThisBatch > 0 && currentBytes + value.length > batchSizeBytes) { + break; + } + + builder.append(value); + currentBytes += value.length; + rowsScannedInThisBatch++; + iterator.next(); + } + + context.incrementRowsScanned(rowsScannedInThisBatch); + boolean hasMore = iterator.isValid() && context.getRowsScanned() < context.getLimit(); + + ScanKvResponse response = new ScanKvResponse(); + response.setScannerId(scannerId).setHasMoreResults(hasMore); + if (rowsScannedInThisBatch > 0) { + DefaultValueRecordBatch batch = builder.build(); + byte[] records = new byte[batch.sizeInBytes()]; + batch.getSegment().get(0, records); + response.setRecords(records); + } + + if (!hasMore) { + scannerManager.removeScanner(scannerId); + } + + return response; + } + + private ScanKvResponse makeScanKvErrorResponse(Throwable e) { + ScanKvResponse response = new ScanKvResponse(); + ApiError error = ApiError.fromThrowable(e); + response.setErrorCode(error.error().code()).setErrorMessage(error.message()); + return response; + } + + @Override + public CompletableFuture scannerKeepAlive( + ScannerKeepAliveRequest request) { + ScannerKeepAliveResponse response = new ScannerKeepAliveResponse(); + try { + scannerManager.keepAlive(request.getScannerId()); + } catch (Exception e) { + ApiError error = ApiError.fromThrowable(e); + response.setErrorCode(error.error().code()).setErrorMessage(error.message()); + } + return CompletableFuture.completedFuture(response); + } + @Override public CompletableFuture notifyLeaderAndIsr( NotifyLeaderAndIsrRequest notifyLeaderAndIsrRequest) { diff --git a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java index 500d197fcf..1c4d7f78c0 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java @@ -81,6 +81,10 @@ import org.apache.fluss.rpc.messages.ProduceLogResponse; import org.apache.fluss.rpc.messages.PutKvRequest; import org.apache.fluss.rpc.messages.PutKvResponse; +import org.apache.fluss.rpc.messages.ScanKvRequest; +import org.apache.fluss.rpc.messages.ScanKvResponse; +import org.apache.fluss.rpc.messages.ScannerKeepAliveRequest; +import org.apache.fluss.rpc.messages.ScannerKeepAliveResponse; import org.apache.fluss.rpc.messages.StopReplicaRequest; import org.apache.fluss.rpc.messages.StopReplicaResponse; import org.apache.fluss.rpc.messages.TableExistsRequest; @@ -204,6 +208,17 @@ public CompletableFuture limitScan(LimitScanRequest request) return null; } + @Override + public CompletableFuture scanKv(ScanKvRequest request) { + return null; + } + + @Override + public CompletableFuture scannerKeepAlive( + ScannerKeepAliveRequest request) { + return null; + } + @Override public CompletableFuture listOffsets(ListOffsetsRequest request) { return null;