From db6d0933e54d8e22648aa93ab676c3f0f74ee88b Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Thu, 15 Jan 2026 21:45:30 +0800 Subject: [PATCH] [server] Support projection pushdown with field id in server. --- .../log/DefaultCompletedFetchTest.java | 3 +- .../fluss/record/FileLogProjection.java | 137 +++++++++--- .../fluss/record/ProjectionPushdownCache.java | 27 ++- .../fluss/record/FileLogProjectionTest.java | 202 ++++++++++++++---- .../apache/fluss/testutils/DataTestUtils.java | 22 +- fluss-rpc/src/main/proto/FlussApi.proto | 2 + .../server/coordinator/SchemaUpdate.java | 4 +- .../fluss/server/entity/FetchReqInfo.java | 29 ++- .../apache/fluss/server/log/FetchParams.java | 3 +- .../fluss/server/replica/ReplicaManager.java | 3 +- .../server/utils/ServerRpcMessageUtils.java | 10 +- .../apache/fluss/server/kv/KvTabletTest.java | 8 +- .../fluss/server/log/FetchParamsTest.java | 4 + .../fluss/server/replica/ReplicaTest.java | 3 + .../server/tablet/TabletServiceITCase.java | 155 +++++++++++++- .../server/testutils/RpcMessageTestUtils.java | 15 ++ 16 files changed, 517 insertions(+), 110 deletions(-) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java index 9f50fe59e4..dce27c9a46 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java @@ -427,7 +427,8 @@ private MemoryLogRecords genRecordsWithProjection( DATA2_TABLE_ID, testingSchemaGetter, DEFAULT_COMPRESSION, - projection.getProjectionInOrder()); + projection.getProjectionInOrder(), + false); ByteBuffer buffer = toByteBuffer( fileLogProjection diff --git a/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java b/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java index 4717271c24..465d6758c2 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java @@ -37,6 +37,7 @@ import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.ipc.message.MessageSerializer; import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.types.pojo.Field; import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.types.pojo.Schema; +import org.apache.fluss.types.DataField; import org.apache.fluss.types.RowType; import org.apache.fluss.utils.ArrowUtils; import org.apache.fluss.utils.types.Tuple2; @@ -99,7 +100,9 @@ public class FileLogProjection { private SchemaGetter schemaGetter; private long tableId; private ArrowCompressionInfo compressionInfo; - private int[] selectedFieldPositions; + private int[] selectedFields; + // whether the selected fields are specified by field ids or field positions. + private boolean isSelectedByIds; public FileLogProjection(ProjectionPushdownCache projectionsCache) { this.projectionsCache = projectionsCache; @@ -115,11 +118,13 @@ public void setCurrentProjection( long tableId, SchemaGetter schemaGetter, ArrowCompressionInfo compressionInfo, - int[] selectedFieldPositions) { + int[] selectedFields, + boolean isSelectedByIds) { this.tableId = tableId; this.schemaGetter = schemaGetter; this.compressionInfo = compressionInfo; - this.selectedFieldPositions = selectedFieldPositions; + this.selectedFields = selectedFields; + this.isSelectedByIds = isSelectedByIds; } /** @@ -317,16 +322,46 @@ private void resizeArrowMetadataBuffer(int metadataSize) { } /** Flatten fields by a pre-order depth-first traversal of the fields in the schema. */ - private void flattenFields( + private static void flattenFieldsByPosition( List arrowFields, - BitSet selectedFields, + BitSet selectedFieldPositions, List> flattenedFields) { for (int i = 0; i < arrowFields.size(); i++) { Field field = arrowFields.get(i); - boolean selected = selectedFields.get(i); + boolean selected = selectedFieldPositions.get(i); flattenedFields.add(Tuple2.of(field, selected)); List children = field.getChildren(); - flattenFields(children, fillBitSet(children.size(), selected), flattenedFields); + flattenFieldsByPosition( + children, fillBitSet(children.size(), selected), flattenedFields); + } + } + + private static void flattenFieldsById( + List arrowFields, + RowType rowType, + BitSet selectedFieldIds, + List> flattenedFields) { + List fields = rowType.getFields(); + for (int i = 0; i < fields.size(); i++) { + DataField flussField = fields.get(i); + Field arrowField = arrowFields.get(i); + int fieldId = flussField.getFieldId(); + boolean selected = selectedFieldIds.get(fieldId); + flattenedFields.add(Tuple2.of(arrowField, selected)); + + List arrowChildren = arrowField.getChildren(); + // if the field is selected, all the children are selected. + if (selected) { + flattenFieldsByPosition( + arrowChildren, fillBitSet(arrowChildren.size(), true), flattenedFields); + } else if (flussField.getType() instanceof RowType) { + // if the field is not selected, search for the children in the flattened fields + flattenFieldsById( + arrowChildren, + (RowType) flussField.getType(), + selectedFieldIds, + flattenedFields); + } } } @@ -403,24 +438,32 @@ ByteBuffer getLogHeaderBuffer() { private ProjectionInfo getOrCreateProjectionInfo(short schemaId) { ProjectionInfo cachedProjection = - projectionsCache.getProjectionInfo(tableId, schemaId, selectedFieldPositions); + projectionsCache.getProjectionInfo( + tableId, schemaId, selectedFields, isSelectedByIds); if (cachedProjection == null) { - cachedProjection = createProjectionInfo(schemaId, selectedFieldPositions); + cachedProjection = createProjectionInfo(schemaId, selectedFields, isSelectedByIds); projectionsCache.setProjectionInfo( - tableId, schemaId, selectedFieldPositions, cachedProjection); + tableId, schemaId, selectedFields, isSelectedByIds, cachedProjection); } return cachedProjection; } - private ProjectionInfo createProjectionInfo(short schemaId, int[] selectedFieldPositions) { + private ProjectionInfo createProjectionInfo( + short schemaId, int[] selectedFields, boolean isSelectedByIds) { org.apache.fluss.metadata.Schema schema = schemaGetter.getSchema(schemaId); RowType rowType = schema.getRowType(); // initialize the projection util information Schema arrowSchema = ArrowUtils.toArrowSchema(rowType); - BitSet selection = toBitSet(arrowSchema.getFields().size(), selectedFieldPositions); - List> flattenedFields = new ArrayList<>(); - flattenFields(arrowSchema.getFields(), selection, flattenedFields); + List> flattenedFields = + isSelectedByIds + ? getTargetFlattenedFieldsByIds( + arrowSchema.getFields(), + rowType, + schema.getHighestFieldId(), + selectedFields) + : getTargetFlattenedFieldsByPositions( + arrowSchema.getFields(), selectedFields); int totalFieldNodes = flattenedFields.size(); int[] bufferLayoutCount = new int[totalFieldNodes]; BitSet nodesProjection = new BitSet(totalFieldNodes); @@ -442,18 +485,32 @@ private ProjectionInfo createProjectionInfo(short schemaId, int[] selectedFieldP } Schema projectedArrowSchema = - ArrowUtils.toArrowSchema(rowType.project(selectedFieldPositions)); + ArrowUtils.toArrowSchema( + isSelectedByIds + ? projectByIds(rowType, schema.getHighestFieldId(), selectedFields) + : rowType.project(selectedFields)); ArrowBodyCompression bodyCompression = CompressionUtil.createBodyCompression(compressionInfo.createCompressionCodec()); int metadataLength = ArrowUtils.estimateArrowMetadataLength(projectedArrowSchema, bodyCompression); return new ProjectionInfo( - nodesProjection, - buffersProjection, - bufferIndex, - metadataLength, - bodyCompression, - selectedFieldPositions); + nodesProjection, buffersProjection, bufferIndex, metadataLength, bodyCompression); + } + + private static List> getTargetFlattenedFieldsByPositions( + List arrowFields, int[] selectedFieldPositions) { + BitSet selection = toBitSet(arrowFields.size(), selectedFieldPositions); + List> flattenedFields = new ArrayList<>(); + flattenFieldsByPosition(arrowFields, selection, flattenedFields); + return flattenedFields; + } + + private static List> getTargetFlattenedFieldsByIds( + List arrowFields, RowType rowType, int highFieldId, int[] selectedFieldIds) { + BitSet selection = toBitSet(highFieldId + 1, selectedFieldIds); + List> flattenedFields = new ArrayList<>(); + flattenFieldsById(arrowFields, rowType, selection, flattenedFields); + return flattenedFields; } /** Projection pushdown information for a specific schema and selected fields. */ @@ -463,21 +520,18 @@ public static final class ProjectionInfo { final int bufferCount; final int arrowMetadataLength; final ArrowBodyCompression bodyCompression; - final int[] selectedFieldPositions; private ProjectionInfo( BitSet nodesProjection, BitSet buffersProjection, int bufferCount, int arrowMetadataLength, - ArrowBodyCompression bodyCompression, - int[] selectedFieldPositions) { + ArrowBodyCompression bodyCompression) { this.nodesProjection = nodesProjection; this.buffersProjection = buffersProjection; this.bufferCount = bufferCount; this.arrowMetadataLength = arrowMetadataLength; this.bodyCompression = bodyCompression; - this.selectedFieldPositions = selectedFieldPositions; } } @@ -514,4 +568,37 @@ public long bodyLength() { return bodyLength; } } + + @VisibleForTesting + public static RowType projectByIds(RowType rowType, int highFieldId, int[] projectFieldIds) { + BitSet selection = toBitSet(highFieldId + 1, projectFieldIds); + List projectedFields = new ArrayList<>(); + projectByIds(rowType, selection, projectedFields); + if (projectFieldIds.length != projectedFields.size()) { + throw new IllegalArgumentException( + String.format( + "The number of projected fields (%d) does not match the number of " + + "selected field IDs (%d). This usually indicates: " + + "(1) Some field IDs do not exist in the schema, or " + + "(2) Both a parent row field and its nested child fields are selected, " + + "which causes duplication. Selected field IDs: %s", + projectedFields.size(), + projectFieldIds.length, + Arrays.toString(projectFieldIds))); + } + return new RowType(projectedFields); + } + + private static void projectByIds( + RowType rowType, BitSet projectFieldIds, List projectedFields) { + List fields = rowType.getFields(); + for (DataField dataField : fields) { + boolean selected = projectFieldIds.get(dataField.getFieldId()); + if (selected) { + projectedFields.add(dataField); + } else if (dataField.getType() instanceof RowType) { + projectByIds((RowType) dataField.getType(), projectFieldIds, projectedFields); + } + } + } } diff --git a/fluss-common/src/main/java/org/apache/fluss/record/ProjectionPushdownCache.java b/fluss-common/src/main/java/org/apache/fluss/record/ProjectionPushdownCache.java index 71f12192b2..5215db9e27 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/ProjectionPushdownCache.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/ProjectionPushdownCache.java @@ -50,26 +50,33 @@ public ProjectionPushdownCache() { @Nullable public ProjectionInfo getProjectionInfo( - long tableId, short schemaId, int[] selectedFieldPositions) { - ProjectionKey key = new ProjectionKey(tableId, schemaId, selectedFieldPositions); + long tableId, short schemaId, int[] selectedColumns, boolean isSelectedByIds) { + ProjectionKey key = new ProjectionKey(tableId, schemaId, selectedColumns, isSelectedByIds); return projectionCache.getIfPresent(key); } public void setProjectionInfo( - long tableId, short schemaId, int[] selectedColumnIds, ProjectionInfo projectionInfo) { - ProjectionKey key = new ProjectionKey(tableId, schemaId, selectedColumnIds); + long tableId, + short schemaId, + int[] selectedColumns, + boolean isSelectedByIds, + ProjectionInfo projectionInfo) { + ProjectionKey key = new ProjectionKey(tableId, schemaId, selectedColumns, isSelectedByIds); projectionCache.put(key, projectionInfo); } static final class ProjectionKey { private final long tableId; private final short schemaId; - private final int[] selectedColumnIds; + private final int[] selectedColumns; + private final boolean isSelectedByIds; - ProjectionKey(long tableId, short schemaId, int[] selectedColumnIds) { + ProjectionKey( + long tableId, short schemaId, int[] selectedColumns, boolean isSelectedByIds) { this.tableId = tableId; this.schemaId = schemaId; - this.selectedColumnIds = selectedColumnIds; + this.selectedColumns = selectedColumns; + this.isSelectedByIds = isSelectedByIds; } @Override @@ -80,12 +87,14 @@ public boolean equals(Object o) { ProjectionKey that = (ProjectionKey) o; return tableId == that.tableId && schemaId == that.schemaId - && Arrays.equals(selectedColumnIds, that.selectedColumnIds); + && Arrays.equals(selectedColumns, that.selectedColumns) + && isSelectedByIds == that.isSelectedByIds; } @Override public int hashCode() { - return Objects.hash(tableId, schemaId, Arrays.hashCode(selectedColumnIds)); + return Objects.hash( + tableId, schemaId, Arrays.hashCode(selectedColumns), isSelectedByIds); } } } diff --git a/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java b/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java index b85e4db32f..6d169ae1e8 100644 --- a/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java @@ -20,8 +20,10 @@ import org.apache.fluss.exception.InvalidColumnProjectionException; import org.apache.fluss.exception.SchemaNotExistException; import org.apache.fluss.metadata.LogFormat; +import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.SchemaInfo; import org.apache.fluss.row.InternalRow; +import org.apache.fluss.types.DataTypes; import org.apache.fluss.types.RowType; import org.apache.fluss.utils.CloseableIterator; @@ -32,12 +34,15 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.EOFException; import java.io.File; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.UUID; import java.util.stream.Stream; @@ -58,6 +63,7 @@ /** Tests for {@link FileLogProjection}. */ class FileLogProjectionTest { + private static final Logger log = LoggerFactory.getLogger(FileLogProjectionTest.class); private @TempDir File tempDir; private TestingSchemaGetter testingSchemaGetter; @@ -67,8 +73,9 @@ void beforeEach() { testingSchemaGetter.updateLatestSchemaInfo(new SchemaInfo(TestData.DATA2_SCHEMA, 2)); } - @Test - void testSetCurrentProjection() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testSetCurrentProjection(boolean isProjectedByIds) throws Exception { short schemaId = (short) 2; FileLogRecords recordsOfData2RowType = createFileLogRecords( @@ -87,10 +94,11 @@ void testSetCurrentProjection() throws Exception { projection, recordsOfData2RowType, new int[] {0, 2}, + isProjectedByIds, recordsOfData2RowType.sizeInBytes()); assertThat(cache.projectionCache.size()).isEqualTo(1); FileLogProjection.ProjectionInfo info1 = - cache.getProjectionInfo(1L, schemaId, new int[] {0, 2}); + cache.getProjectionInfo(1L, schemaId, new int[] {0, 2}, isProjectedByIds); assertThat(info1).isNotNull(); assertThat(info1.nodesProjection.stream().toArray()).isEqualTo(new int[] {0, 2}); // a int: [0,1] ; b string: [2,3,4] ; c string: [5,6,7] @@ -102,10 +110,11 @@ void testSetCurrentProjection() throws Exception { projection, recordsOfData2RowType, new int[] {1}, + isProjectedByIds, recordsOfData2RowType.sizeInBytes()); assertThat(cache.projectionCache.size()).isEqualTo(2); FileLogProjection.ProjectionInfo info2 = - cache.getProjectionInfo(2L, schemaId, new int[] {1}); + cache.getProjectionInfo(2L, schemaId, new int[] {1}, isProjectedByIds); assertThat(info2).isNotNull(); assertThat(info2.nodesProjection.stream().toArray()).isEqualTo(new int[] {1}); // a int: [0,1] ; b string: [2,3,4] ; c string: [5,6,7] @@ -209,23 +218,24 @@ void testProjectionOldDataWithNewSchema() throws Exception { static Stream projectedFieldsArgs() { return Stream.of( - Arguments.of((Object) new int[] {0}, LOG_MAGIC_VALUE_V0, (short) 1), - Arguments.arguments((Object) new int[] {1}, LOG_MAGIC_VALUE_V0, (short) 1), - Arguments.arguments((Object) new int[] {0, 1}, LOG_MAGIC_VALUE_V0, (short) 1), - Arguments.of((Object) new int[] {0}, LOG_MAGIC_VALUE_V1, (short) 1), - Arguments.arguments((Object) new int[] {1}, LOG_MAGIC_VALUE_V1, (short) 1), - Arguments.arguments((Object) new int[] {0, 1}, LOG_MAGIC_VALUE_V1, (short) 1), - Arguments.of((Object) new int[] {0}, LOG_MAGIC_VALUE_V0, (short) 2), - Arguments.arguments((Object) new int[] {1}, LOG_MAGIC_VALUE_V0, (short) 2), - Arguments.arguments((Object) new int[] {0, 1}, LOG_MAGIC_VALUE_V0, (short) 2), - Arguments.of((Object) new int[] {0}, LOG_MAGIC_VALUE_V1, (short) 2), - Arguments.arguments((Object) new int[] {1}, LOG_MAGIC_VALUE_V1, (short) 2), - Arguments.arguments((Object) new int[] {0, 1}, LOG_MAGIC_VALUE_V1, (short) 2)); + Arguments.of(new int[] {0}, LOG_MAGIC_VALUE_V0, (short) 1, true), + Arguments.arguments(new int[] {1}, LOG_MAGIC_VALUE_V0, (short) 1, true), + Arguments.arguments(new int[] {0, 1}, LOG_MAGIC_VALUE_V0, (short) 1, true), + Arguments.of(new int[] {0}, LOG_MAGIC_VALUE_V1, (short) 1, false), + Arguments.arguments(new int[] {1}, LOG_MAGIC_VALUE_V1, (short) 1, false), + Arguments.arguments(new int[] {0, 1}, LOG_MAGIC_VALUE_V1, (short) 1, false), + Arguments.of(new int[] {0}, LOG_MAGIC_VALUE_V0, (short) 2, false), + Arguments.arguments(new int[] {1}, LOG_MAGIC_VALUE_V0, (short) 2, false), + Arguments.arguments(new int[] {0, 1}, LOG_MAGIC_VALUE_V0, (short) 2, false), + Arguments.of(new int[] {0}, LOG_MAGIC_VALUE_V1, (short) 2, true), + Arguments.arguments(new int[] {1}, LOG_MAGIC_VALUE_V1, (short) 2, true), + Arguments.arguments(new int[] {0, 1}, LOG_MAGIC_VALUE_V1, (short) 2, true)); } @ParameterizedTest @MethodSource("projectedFieldsArgs") - void testProject(int[] projectedFields, byte recordBatchMagic, short schemaId) + void testProject( + int[] projectedFields, byte recordBatchMagic, short schemaId, boolean isProjectedByIds) throws Exception { testingSchemaGetter.updateLatestSchemaInfo(new SchemaInfo(TestData.DATA1_SCHEMA, schemaId)); FileLogRecords fileLogRecords = @@ -242,6 +252,7 @@ void testProject(int[] projectedFields, byte recordBatchMagic, short schemaId) new FileLogProjection(new ProjectionPushdownCache()), fileLogRecords, projectedFields, + isProjectedByIds, Integer.MAX_VALUE); List allData = new ArrayList<>(); allData.addAll(TestData.DATA1); @@ -412,6 +423,84 @@ void testReadLogHeaderFullyOrFail() throws Exception { } } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testNestedRowProjection(boolean isProjectedByIds) throws Exception { + int schemaId = 1; + final Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .column( + "c", + DataTypes.ROW( + DataTypes.INT(), + DataTypes.ROW(DataTypes.INT(), DataTypes.STRING()))) + .column("d", DataTypes.INT()) + .column("e", DataTypes.STRING()) + .build(); + List data = + Arrays.asList( + new Object[] { + 1, "a", new Object[] {11, new Object[] {111, "aa"}}, 1111, "aaa" + }, + new Object[] { + 2, "b", new Object[] {22, new Object[] {222, "bb"}}, 2222, "bbb" + }, + new Object[] { + 3, "c", new Object[] {33, new Object[] {333, "cc"}}, 3333, "ccc" + }, + new Object[] { + 4, "d", new Object[] {44, new Object[] {444, "dd"}}, 4444, "ddd" + }, + new Object[] { + 5, "e", new Object[] {55, new Object[] {555, "ee"}}, 5555, "eee" + }); + + int[] projectedFields = new int[] {0, 3, 4}; + List expected = + Arrays.asList( + new Object[] { + 1, + isProjectedByIds ? 11 : 1111, + isProjectedByIds ? new Object[] {111, "aa"} : "aaa" + }, + new Object[] { + 2, + isProjectedByIds ? 22 : 2222, + isProjectedByIds ? new Object[] {222, "bb"} : "bbb" + }, + new Object[] { + 3, + isProjectedByIds ? 33 : 3333, + isProjectedByIds ? new Object[] {333, "cc"} : "ccc" + }, + new Object[] { + 4, + isProjectedByIds ? 44 : 4444, + isProjectedByIds ? new Object[] {444, "dd"} : "ddd" + }, + new Object[] { + 5, + isProjectedByIds ? 55 : 5555, + isProjectedByIds ? new Object[] {555, "ee"} : "eee" + }); + + testingSchemaGetter.updateLatestSchemaInfo(new SchemaInfo(schema, 1)); + FileLogRecords fileLogRecords = + createFileLogRecords(schemaId, (byte) 1, schema.getRowType(), data); + List results = + doProjection( + 1L, + schemaId, + new FileLogProjection(new ProjectionPushdownCache()), + fileLogRecords, + projectedFields, + isProjectedByIds, + Integer.MAX_VALUE); + assertEquals(results, expected); + } + private FileLogRecords createFileWithLogHeader(byte magic, int length) throws Exception { ByteBuffer buffer = ByteBuffer.allocate(length).order(ByteOrder.LITTLE_ENDIAN); buffer.position(MAGIC_OFFSET); @@ -471,14 +560,42 @@ private List doProjection( int[] projectedFields, int fetchMaxBytes) throws Exception { + return doProjection( + tableId, + schemaId, + projection, + fileLogRecords, + projectedFields, + false, + fetchMaxBytes); + } + + private List doProjection( + long tableId, + int schemaId, + FileLogProjection projection, + FileLogRecords fileLogRecords, + int[] projectedFields, + boolean isProjectedByIds, + int fetchMaxBytes) + throws Exception { projection.setCurrentProjection( - tableId, testingSchemaGetter, DEFAULT_COMPRESSION, projectedFields); + tableId, + testingSchemaGetter, + DEFAULT_COMPRESSION, + projectedFields, + isProjectedByIds); LogRecords project = projection.project( fileLogRecords.channel(), 0, fileLogRecords.sizeInBytes(), fetchMaxBytes); assertThat(project.sizeInBytes()).isLessThanOrEqualTo(fetchMaxBytes); - RowType rowType = testingSchemaGetter.getSchema(schemaId).getRowType(); - RowType projectedType = rowType.project(projectedFields); + Schema schema = testingSchemaGetter.getSchema(schemaId); + RowType rowType = schema.getRowType(); + RowType projectedType = + isProjectedByIds + ? FileLogProjection.projectByIds( + rowType, schema.getHighestFieldId(), projectedFields) + : rowType.project(projectedFields); List results = new ArrayList<>(); long expectedOffset = 0L; try (LogRecordReadContext context = @@ -491,24 +608,7 @@ private List doProjection( assertThat(record.getChangeType()).isEqualTo(ChangeType.APPEND_ONLY); InternalRow row = record.getRow(); assertThat(row.getFieldCount()).isEqualTo(projectedFields.length); - Object[] objs = new Object[projectedFields.length]; - for (int i = 0; i < projectedFields.length; i++) { - if (row.isNullAt(i)) { - objs[i] = null; - continue; - } - switch (projectedType.getTypeAt(i).getTypeRoot()) { - case INTEGER: - objs[i] = row.getInt(i); - break; - case STRING: - objs[i] = row.getString(i).toString(); - break; - default: - throw new IllegalArgumentException( - "Unsupported type: " + projectedType.getTypeAt(i)); - } - } + Object[] objs = getRow(projectedType, row); results.add(objs); expectedOffset++; } @@ -524,4 +624,30 @@ private static void assertEquals(List actual, List expected) assertThat(actual.get(i)).isEqualTo(expected.get(i)); } } + + private static Object[] getRow(RowType projectedType, InternalRow row) { + Object[] results = new Object[projectedType.getFieldCount()]; + for (int i = 0; i < projectedType.getFieldCount(); i++) { + if (row.isNullAt(i)) { + results[i] = null; + continue; + } + switch (projectedType.getTypeAt(i).getTypeRoot()) { + case INTEGER: + results[i] = row.getInt(i); + break; + case STRING: + results[i] = row.getString(i).toString(); + break; + case ROW: + RowType rowType = (RowType) projectedType.getTypeAt(i); + results[i] = getRow(rowType, row.getRow(i, rowType.getFieldCount())); + break; + default: + throw new IllegalArgumentException( + "Unsupported type: " + projectedType.getTypeAt(i)); + } + } + return results; + } } diff --git a/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java b/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java index 8594cf224b..d7492a2c0f 100644 --- a/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java +++ b/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java @@ -726,14 +726,8 @@ private static void assertLogRecordsEqualsWithRowKind( RowType rowType, LogRecord logRecord, Tuple2 expectedFieldAndRowKind) { - DataType[] dataTypes = rowType.getChildren().toArray(new DataType[0]); - InternalRow.FieldGetter[] fieldGetter = new InternalRow.FieldGetter[dataTypes.length]; - for (int i = 0; i < dataTypes.length; i++) { - fieldGetter[i] = InternalRow.createFieldGetter(dataTypes[i], i); - } assertThat(logRecord.getChangeType()).isEqualTo(expectedFieldAndRowKind.f0); - assertRowValueEquals( - fieldGetter, dataTypes, logRecord.getRow(), expectedFieldAndRowKind.f1); + assertLogRecordsEqualsWithRowKind(rowType, logRecord.getRow(), expectedFieldAndRowKind.f1); } public static void assertLogRecordsEquals( @@ -742,6 +736,17 @@ public static void assertLogRecordsEquals( DEFAULT_SCHEMA_ID, rowType, logRecords, expectedValue, TEST_SCHEMA_GETTER); } + private static void assertLogRecordsEqualsWithRowKind( + RowType rowType, InternalRow row, Object[] expectVal) { + DataType[] dataTypes = rowType.getChildren().toArray(new DataType[0]); + InternalRow.FieldGetter[] fieldGetter = new InternalRow.FieldGetter[dataTypes.length]; + for (int i = 0; i < dataTypes.length; i++) { + fieldGetter[i] = InternalRow.createFieldGetter(dataTypes[i], i); + } + + assertRowValueEquals(fieldGetter, dataTypes, row, expectVal); + } + public static void assertLogRecordsEquals( RowType rowType, LogRecords logRecords, @@ -821,6 +826,9 @@ private static void assertRowValueEquals( if (field != null) { if (dataTypes[i].getTypeRoot() == DataTypeRoot.STRING) { assertThat(field).isEqualTo(BinaryString.fromString((String) expectVal[i])); + } else if (dataTypes[i].getTypeRoot() == DataTypeRoot.ROW) { + assertRowValueEquals( + (RowType) dataTypes[i], (InternalRow) field, (Object[]) expectVal[i]); } else { assertThat(field).isEqualTo(expectVal[i]); } diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index db9d614354..e49d65beda 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -702,6 +702,8 @@ message PbFetchLogReqForTable { required bool projection_pushdown_enabled = 2; repeated int32 projected_fields = 3 [packed = true]; repeated PbFetchLogReqForBucket buckets_req = 4; + // If true, projected_fields are interpreted as field IDs; if false, they are field positions. + optional bool project_by_ids = 5; } message PbFetchLogReqForBucket { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java index e3527cb886..e91b0ea5f0 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java @@ -63,9 +63,7 @@ public SchemaUpdate(TableInfo tableInfo) { public Schema getSchema() { Schema.Builder builder = - Schema.newBuilder() - .fromColumns(columns) - .highestFieldId((short) highestFieldId.get()); + Schema.newBuilder().fromColumns(columns).highestFieldId(highestFieldId.get()); if (!primaryKeys.isEmpty()) { builder.primaryKey(primaryKeys); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/entity/FetchReqInfo.java b/fluss-server/src/main/java/org/apache/fluss/server/entity/FetchReqInfo.java index 0f8df68002..462739632d 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/entity/FetchReqInfo.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/entity/FetchReqInfo.java @@ -30,19 +30,25 @@ public final class FetchReqInfo { private final long tableId; private final long fetchOffset; @Nullable private final int[] projectFields; + private final boolean isProjectByIds; - private int maxBytes; + private final int maxBytes; public FetchReqInfo(long tableId, long fetchOffset, int maxBytes) { - this(tableId, fetchOffset, maxBytes, null); + this(tableId, fetchOffset, maxBytes, null, false); } public FetchReqInfo( - long tableId, long fetchOffset, int maxBytes, @Nullable int[] projectFields) { + long tableId, + long fetchOffset, + int maxBytes, + @Nullable int[] projectFields, + boolean isProjectByIds) { this.tableId = tableId; this.fetchOffset = fetchOffset; this.maxBytes = maxBytes; this.projectFields = projectFields; + this.isProjectByIds = isProjectByIds; } public long getTableId() { @@ -53,14 +59,14 @@ public long getFetchOffset() { return fetchOffset; } - public void setFetchMaxBytes(int maxBytes) { - this.maxBytes = maxBytes; - } - public int getMaxBytes() { return maxBytes; } + public boolean isProjectByIds() { + return isProjectByIds; + } + @Nullable public int[] getProjectFields() { return projectFields; @@ -77,6 +83,8 @@ public String toString() { + maxBytes + ", projectionFields=" + Arrays.toString(projectFields) + + ", isProjectByIds=" + + isProjectByIds + '}'; } @@ -97,11 +105,14 @@ public boolean equals(Object o) { return false; } - return fetchOffset == fetchReqInfo.fetchOffset && maxBytes == fetchReqInfo.maxBytes; + return fetchOffset == fetchReqInfo.fetchOffset + && maxBytes == fetchReqInfo.maxBytes + && isProjectByIds == fetchReqInfo.isProjectByIds; } @Override public int hashCode() { - return Objects.hash(tableId, fetchOffset, maxBytes, Arrays.hashCode(projectFields)); + return Objects.hash( + tableId, fetchOffset, maxBytes, Arrays.hashCode(projectFields), isProjectByIds); } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/FetchParams.java b/fluss-server/src/main/java/org/apache/fluss/server/log/FetchParams.java index 0ec42a0903..3b26d59b8a 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/FetchParams.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/FetchParams.java @@ -101,6 +101,7 @@ public void setCurrentFetch( SchemaGetter schemaGetter, ArrowCompressionInfo compressionInfo, @Nullable int[] projectedFields, + boolean isSelectedByIds, ProjectionPushdownCache projectionCache) { this.fetchOffset = fetchOffset; this.maxFetchBytes = maxFetchBytes; @@ -111,7 +112,7 @@ public void setCurrentFetch( } fileLogProjection.setCurrentProjection( - tableId, schemaGetter, compressionInfo, projectedFields); + tableId, schemaGetter, compressionInfo, projectedFields, isSelectedByIds); } else { projectionEnabled = false; } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index 421db4a068..02f3a5e44e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -153,7 +153,6 @@ public class ReplicaManager { private final Map allReplicas = MapUtils.newConcurrentHashMap(); private final TabletServerMetadataCache metadataCache; - private final ExecutorService ioExecutor; private final ProjectionPushdownCache projectionsCache = new ProjectionPushdownCache(); private final Lock replicaStateChangeLock = new ReentrantLock(); @@ -292,7 +291,6 @@ public ReplicaManager( this.serverMetricGroup = serverMetricGroup; this.userMetrics = userMetrics; this.clock = clock; - this.ioExecutor = ioExecutor; registerMetrics(); } @@ -1113,6 +1111,7 @@ public Map readFromLog( replica.getSchemaGetter(), replica.getArrowCompressionInfo(), fetchReqInfo.getProjectFields(), + fetchReqInfo.isProjectByIds(), projectionsCache); LogReadInfo readInfo = replica.fetchRecords(fetchParams); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java index f174f727a4..bee9794af0 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java @@ -822,6 +822,13 @@ public static Map getFetchLogData(FetchLogRequest req projectionFields = null; } + final boolean isSelectedByIds; + if (fetchLogReqForTable.hasProjectByIds()) { + isSelectedByIds = fetchLogReqForTable.isProjectByIds(); + } else { + isSelectedByIds = false; + } + List bucketsReqsList = fetchLogReqForTable.getBucketsReqsList(); for (PbFetchLogReqForBucket fetchLogReqForBucket : bucketsReqsList) { int bucketId = fetchLogReqForBucket.getBucketId(); @@ -836,7 +843,8 @@ public static Map getFetchLogData(FetchLogRequest req tableId, fetchLogReqForBucket.getFetchOffset(), fetchLogReqForBucket.getMaxFetchBytes(), - projectionFields)); + projectionFields, + isSelectedByIds)); } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java index 34f50691a8..f9e975d969 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java @@ -732,7 +732,7 @@ void testFirstRowMergeEngine(boolean doProjection) throws Exception { if (doProjection) { logProjection = new FileLogProjection(projectionCache); logProjection.setCurrentProjection( - 0L, schemaGetter, DEFAULT_COMPRESSION, new int[] {0}); + 0L, schemaGetter, DEFAULT_COMPRESSION, new int[] {0}, true); } RowType readLogRowType = doProjection ? rowType.project(new int[] {0}) : rowType; @@ -828,7 +828,7 @@ void testFirstRowMergeEngine(boolean doProjection) throws Exception { if (doProjection) { logProjection = new FileLogProjection(projectionCache); logProjection.setCurrentProjection( - 0L, schemaGetter, DEFAULT_COMPRESSION, new int[] {0}); + 0L, schemaGetter, DEFAULT_COMPRESSION, new int[] {0}, true); } // schema evolution case 1 :insert with data with schema 2 @@ -916,7 +916,7 @@ void testVersionRowMergeEngine(boolean doProjection) throws Exception { if (doProjection) { logProjection = new FileLogProjection(projectionCache); logProjection.setCurrentProjection( - 0L, schemaGetter, DEFAULT_COMPRESSION, new int[] {0}); + 0L, schemaGetter, DEFAULT_COMPRESSION, new int[] {0}, true); } RowType readLogRowType = doProjection ? rowType.project(new int[] {0}) : rowType; @@ -1036,7 +1036,7 @@ void testVersionRowMergeEngine(boolean doProjection) throws Exception { if (doProjection) { logProjection = new FileLogProjection(projectionCache); logProjection.setCurrentProjection( - 0L, schemaGetter, DEFAULT_COMPRESSION, new int[] {0}); + 0L, schemaGetter, DEFAULT_COMPRESSION, new int[] {0}, true); } // schema evolution case 1 :insert with data with new schema diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/FetchParamsTest.java b/fluss-server/src/test/java/org/apache/fluss/server/log/FetchParamsTest.java index d4c2be57b8..e3b9391dac 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/log/FetchParamsTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/log/FetchParamsTest.java @@ -42,6 +42,7 @@ void testSetCurrentFetch() { new TestingSchemaGetter(new SchemaInfo(TestData.DATA1_SCHEMA, (short) 1)), DEFAULT_COMPRESSION, null, + false, projectionCache); assertThat(fetchParams.fetchOffset()).isEqualTo(20L); assertThat(fetchParams.maxFetchBytes()).isEqualTo(1024); @@ -54,6 +55,7 @@ void testSetCurrentFetch() { new TestingSchemaGetter(new SchemaInfo(TestData.DATA2_SCHEMA, (short) 1)), DEFAULT_COMPRESSION, new int[] {0, 2}, + false, projectionCache); assertThat(fetchParams.fetchOffset()).isEqualTo(30L); assertThat(fetchParams.maxFetchBytes()).isEqualTo(512); @@ -68,6 +70,7 @@ void testSetCurrentFetch() { new TestingSchemaGetter(new SchemaInfo(TestData.DATA1_SCHEMA, (short) 1)), DEFAULT_COMPRESSION, null, + false, projectionCache); assertThat(fetchParams.projection()).isNull(); @@ -78,6 +81,7 @@ void testSetCurrentFetch() { new TestingSchemaGetter(new SchemaInfo(TestData.DATA2_SCHEMA, (short) 1)), DEFAULT_COMPRESSION, new int[] {0, 2}, + false, projectionCache); // the FileLogProjection should be cached assertThat(fetchParams.projection()).isNotNull().isSameAs(prevProjection); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java index c8e26ea7b5..35c6816875 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java @@ -146,6 +146,7 @@ void testAppendRecordsToLeader() throws Exception { schemaGetter, DEFAULT_COMPRESSION, null, + true, projectionCache); LogReadInfo logReadInfo = logReplica.fetchRecords(fetchParams); assertLogRecordsEquals( @@ -172,6 +173,7 @@ void testAppendRecordsToLeader() throws Exception { schemaGetter, DEFAULT_COMPRESSION, null, + true, projectionCache); logReadInfo = logReplica.fetchRecords(fetchParams); assertLogRecordsEquals( @@ -818,6 +820,7 @@ private static LogRecords fetchRecords(Replica replica, long offset) throws IOEx replica.getSchemaGetter(), DEFAULT_COMPRESSION, null, + true, new ProjectionPushdownCache()); LogReadInfo logReadInfo = replica.fetchRecords(fetchParams); return logReadInfo.getFetchedData().getRecords(); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java index 5a28f9e2be..e1d0150c12 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java @@ -29,6 +29,7 @@ import org.apache.fluss.metadata.TablePath; import org.apache.fluss.record.DefaultKvRecordBatch; import org.apache.fluss.record.DefaultValueRecordBatch; +import org.apache.fluss.record.FileLogProjection; import org.apache.fluss.record.TestingSchemaGetter; import org.apache.fluss.row.encode.CompactedKeyEncoder; import org.apache.fluss.row.encode.ValueEncoder; @@ -65,6 +66,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import javax.annotation.Nullable; @@ -80,6 +83,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.stream.Stream; import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V0; import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1; @@ -217,8 +221,8 @@ void testProduceLogResponseReturnInOrder() throws Exception { } @ParameterizedTest - @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1}) - void testFetchLog(byte recordBatchMagic) throws Exception { + @MethodSource("projectedFieldsArgs") + void testFetchLog(byte recordBatchMagic, boolean isProjectByIds) throws Exception { long tableId = createTable(FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH, DATA1_TABLE_DESCRIPTOR); TableBucket tb = new TableBucket(tableId, 0); @@ -245,7 +249,9 @@ void testFetchLog(byte recordBatchMagic) throws Exception { // fetch from this bucket from offset 0, return data1. assertFetchLogResponse( - leaderGateWay.fetchLog(newFetchLogRequest(-1, tableId, 0, 0L)).get(), + leaderGateWay + .fetchLog(newFetchLogRequest(-1, tableId, 0, 0L, null, isProjectByIds)) + .get(), tableId, 0, 10L, @@ -253,7 +259,9 @@ void testFetchLog(byte recordBatchMagic) throws Exception { // fetch from this bucket from offset 3, return data1. assertFetchLogResponse( - leaderGateWay.fetchLog(newFetchLogRequest(-1, tableId, 0, 3L)).get(), + leaderGateWay + .fetchLog(newFetchLogRequest(-1, tableId, 0, 3L, null, isProjectByIds)) + .get(), tableId, 0, 10L, @@ -275,7 +283,9 @@ void testFetchLog(byte recordBatchMagic) throws Exception { // fetch this bucket from offset 10, return data2. assertFetchLogResponse( - leaderGateWay.fetchLog(newFetchLogRequest(-1, tableId, 0, 10L)).get(), + leaderGateWay + .fetchLog(newFetchLogRequest(-1, tableId, 0, 10L, null, isProjectByIds)) + .get(), tableId, 0, 20L, @@ -283,7 +293,9 @@ void testFetchLog(byte recordBatchMagic) throws Exception { // fetch this bucket from offset 100, return error code. assertFetchLogResponse( - leaderGateWay.fetchLog(newFetchLogRequest(-1, tableId, 0, 100L)).get(), + leaderGateWay + .fetchLog(newFetchLogRequest(-1, tableId, 0, 100L, null, isProjectByIds)) + .get(), tableId, 0, Errors.LOG_OFFSET_OUT_OF_RANGE_EXCEPTION.code(), @@ -298,7 +310,9 @@ void testFetchLog(byte recordBatchMagic) throws Exception { } assertFetchLogResponse( leaderGateWay - .fetchLog(newFetchLogRequest(-1, tableId, 0, 10L, new int[] {0})) + .fetchLog( + newFetchLogRequest( + -1, tableId, 0, 10L, new int[] {0}, isProjectByIds)) .get(), DATA1_ROW_TYPE.project(new int[] {0}), schemaGetter, @@ -314,7 +328,9 @@ void testFetchLog(byte recordBatchMagic) throws Exception { } assertFetchLogResponse( leaderGateWay - .fetchLog(newFetchLogRequest(-1, tableId, 0, 15L, new int[] {1})) + .fetchLog( + newFetchLogRequest( + -1, tableId, 0, 15L, new int[] {1}, isProjectByIds)) .get(), DATA1_ROW_TYPE.project(new int[] {1}), schemaGetter, @@ -325,7 +341,9 @@ void testFetchLog(byte recordBatchMagic) throws Exception { assertFetchLogResponse( leaderGateWay - .fetchLog(newFetchLogRequest(-1, tableId, 0, 10L, new int[] {2, 3})) + .fetchLog( + newFetchLogRequest( + -1, tableId, 0, 10L, new int[] {2, 3}, isProjectByIds)) .get(), tableId, 0, @@ -365,7 +383,7 @@ void testFetchLogWithMinFetchSizeAndTimeout() throws Exception { leaderGateWay .fetchLog( newFetchLogRequest( - -1, tableId, 0, 0L, null, 1, Integer.MAX_VALUE, 100)) + -1, tableId, 0, 0L, null, false, 1, Integer.MAX_VALUE, 100)) .get(); assertThat(fetchLogResponse.getTablesRespsCount()).isEqualTo(1); fetchLogRespForTable = fetchLogResponse.getTablesRespsList().get(0); @@ -385,6 +403,7 @@ void testFetchLogWithMinFetchSizeAndTimeout() throws Exception { 0, 0L, null, + false, 1, Integer.MAX_VALUE, (int) Duration.ofMinutes(5).toMillis())); @@ -411,6 +430,7 @@ void testFetchLogWithMinFetchSizeAndTimeout() throws Exception { 0, 0L, null, + false, 1, Integer.MAX_VALUE, (int) Duration.ofMinutes(5).toMillis())) @@ -421,6 +441,113 @@ void testFetchLogWithMinFetchSizeAndTimeout() throws Exception { DATA1); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testFetchLogWithNestedRowProjectionPushdown(boolean isProjectedByIds) throws Exception { + final Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .column( + "c", + DataTypes.ROW( + DataTypes.INT(), + DataTypes.ROW(DataTypes.INT(), DataTypes.STRING()))) + .column("d", DataTypes.INT()) + .column("e", DataTypes.STRING()) + .build(); + RowType rowType = schema.getRowType(); + List data = + Arrays.asList( + new Object[] { + 1, "a", new Object[] {11, new Object[] {111, "aa"}}, 1111, "aaa" + }, + new Object[] { + 2, "b", new Object[] {22, new Object[] {222, "bb"}}, 2222, "bbb" + }, + new Object[] { + 3, "c", new Object[] {33, new Object[] {333, "cc"}}, 3333, "ccc" + }, + new Object[] { + 4, "d", new Object[] {44, new Object[] {444, "dd"}}, 4444, "ddd" + }, + new Object[] { + 5, "e", new Object[] {55, new Object[] {555, "ee"}}, 5555, "eee" + }); + + int[] projectedFields = new int[] {0, 3, 4}; + List expected = + Arrays.asList( + new Object[] { + 1, + isProjectedByIds ? 11 : 1111, + isProjectedByIds ? new Object[] {111, "aa"} : "aaa" + }, + new Object[] { + 2, + isProjectedByIds ? 22 : 2222, + isProjectedByIds ? new Object[] {222, "bb"} : "bbb" + }, + new Object[] { + 3, + isProjectedByIds ? 33 : 3333, + isProjectedByIds ? new Object[] {333, "cc"} : "ccc" + }, + new Object[] { + 4, + isProjectedByIds ? 44 : 4444, + isProjectedByIds ? new Object[] {444, "dd"} : "ddd" + }, + new Object[] { + 5, + isProjectedByIds ? 55 : 5555, + isProjectedByIds ? new Object[] {555, "ee"} : "eee" + }); + + long tableId = + createTable( + FLUSS_CLUSTER_EXTENSION, + DATA1_TABLE_PATH, + TableDescriptor.builder().schema(schema).distributedBy(3).build()); + TableBucket tb = new TableBucket(tableId, 0); + SchemaGetter schemaGetter = new TestingSchemaGetter(1, schema); + + FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb); + + int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb); + TabletServerGateway leaderGateWay = + FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader); + + // produce one batch to this bucket. + assertProduceLogResponse( + leaderGateWay + .produceLog( + newProduceLogRequest( + tableId, + 0, + 1, + genMemoryLogRecordsByObject(rowType, 1, (byte) 1, data))) + .get(), + 0, + 0L); + + assertFetchLogResponse( + leaderGateWay + .fetchLog( + newFetchLogRequest( + -1, tableId, 0, 0L, projectedFields, isProjectedByIds)) + .get(), + isProjectedByIds + ? FileLogProjection.projectByIds( + rowType, schema.getHighestFieldId(), projectedFields) + : rowType.project(projectedFields), + schemaGetter, + tableId, + 0, + 5L, + expected); + } + @Test void testInvalidFetchLog() throws Exception { long tableId = @@ -973,4 +1100,12 @@ private NotifyLeaderAndIsrRequest makeNotifyLeaderAndIsrRequest( return ServerRpcMessageUtils.makeNotifyLeaderAndIsrRequest( 0, Collections.singletonList(reqForBucket)); } + + static Stream projectedFieldsArgs() { + return Stream.of( + Arguments.of(LOG_MAGIC_VALUE_V0, true), + Arguments.arguments(LOG_MAGIC_VALUE_V0, false), + Arguments.of(LOG_MAGIC_VALUE_V1, true), + Arguments.arguments(LOG_MAGIC_VALUE_V1, false)); + } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java index b567eff1f7..2b9ed46de7 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java @@ -214,12 +214,24 @@ public static FetchLogRequest newFetchLogRequest( public static FetchLogRequest newFetchLogRequest( int followerId, long tableId, int bucketId, long fetchOffset, int[] selectedFields) { + return newFetchLogRequest( + followerId, tableId, bucketId, fetchOffset, selectedFields, false); + } + + public static FetchLogRequest newFetchLogRequest( + int followerId, + long tableId, + int bucketId, + long fetchOffset, + int[] selectedFields, + boolean isProjectByIds) { return newFetchLogRequest( followerId, tableId, bucketId, fetchOffset, selectedFields, + isProjectByIds, -1, Integer.MAX_VALUE, -1); @@ -231,6 +243,7 @@ public static FetchLogRequest newFetchLogRequest( int bucketId, long fetchOffset, int[] selectedFields, + boolean isProjectByIds, int minFetchBytes, int maxFetchBytes, int maxWaitMs) { @@ -241,6 +254,7 @@ public static FetchLogRequest newFetchLogRequest( } PbFetchLogReqForTable fetchLogReqForTable = new PbFetchLogReqForTable().setTableId(tableId); + fetchLogReqForTable.setProjectByIds(isProjectByIds); if (selectedFields != null) { fetchLogReqForTable .setProjectionPushdownEnabled(true) @@ -248,6 +262,7 @@ public static FetchLogRequest newFetchLogRequest( } else { fetchLogReqForTable.setProjectionPushdownEnabled(false); } + // TODO make the max fetch bytes configurable. PbFetchLogReqForBucket fetchLogReqForBucket = new PbFetchLogReqForBucket()