From 96b26062cedcee8696bb9eb201b2a14e9987b27b Mon Sep 17 00:00:00 2001 From: Aleksey Plekhanov Date: Sat, 20 Dec 2025 21:47:18 +0300 Subject: [PATCH 1/5] IGNITE-27431 SQL Calcite: Optimize scans with filter --- .../exec/AbstractCacheColumnsScan.java | 31 ++++- .../query/calcite/exec/AbstractCacheScan.java | 4 +- .../query/calcite/exec/IndexScan.java | 62 ++++++---- .../calcite/exec/LogicalRelImplementor.java | 75 +++++++++--- .../query/calcite/exec/SystemViewScan.java | 12 +- .../query/calcite/exec/TableRowIterable.java | 43 +++++++ .../query/calcite/exec/TableScan.java | 29 ++++- .../query/calcite/exec/rel/ScanNode.java | 36 ++++-- .../calcite/exec/rel/ScanTableRowNode.java | 115 ++++++++++++++++++ .../rel/ProjectableFilterableTableScan.java | 24 ++++ .../schema/CacheTableDescriptorImpl.java | 37 +++--- .../schema/SystemViewTableDescriptorImpl.java | 31 ++--- .../query/calcite/schema/TableDescriptor.java | 15 ++- .../calcite/planner/AbstractPlannerTest.java | 4 +- 14 files changed, 404 insertions(+), 114 deletions(-) create mode 100644 modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableRowIterable.java create mode 100644 modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanTableRowNode.java diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheColumnsScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheColumnsScan.java index c5966bf9f549bb..b77178ccfb4837 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheColumnsScan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheColumnsScan.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.query.calcite.exec; +import java.util.Iterator; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.util.ImmutableBitSet; import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory; @@ -24,7 +25,8 @@ import org.jetbrains.annotations.Nullable; /** */ -public abstract class AbstractCacheColumnsScan extends AbstractCacheScan { +public abstract class AbstractCacheColumnsScan extends AbstractCacheScan + implements TableRowIterable { /** */ protected final CacheTableDescriptor desc; @@ -34,8 +36,8 @@ public abstract class AbstractCacheColumnsScan extends AbstractCacheScan extends AbstractCacheScan tableRowIterator() { + reserve(); + + try { + return createTableRowIterator(); + } + catch (Exception e) { + release(); + + throw e; + } + } + + /** Table row iterator.*/ + protected abstract Iterator createTableRowIterator(); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java index 1f46ed0d2da72f..e7bab98837e90b 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java @@ -100,7 +100,7 @@ public abstract class AbstractCacheScan implements Iterable, AutoClose } /** */ - private synchronized void reserve() { + protected synchronized void reserve() { if (reservation != null) return; @@ -151,7 +151,7 @@ private synchronized void reserve() { } /** */ - private synchronized void release() { + protected synchronized void release() { if (reservation != null) reservation.release(); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java index 32c051666a16a8..22bfbe821c0b7a 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java @@ -56,7 +56,7 @@ /** * Scan on index. */ -public class IndexScan extends AbstractCacheColumnsScan { +public class IndexScan extends AbstractCacheColumnsScan { /** */ private final GridKernalContext kctx; @@ -109,7 +109,7 @@ public IndexScan( for (int i = 0; i < srcRowType.getFieldCount(); i++) fieldsStoreTypes[i] = typeFactory.getResultClass(srcRowType.getFieldList().get(i).getType()); - fieldIdxMapping = fieldToInlinedKeysMapping(srcRowType.getFieldCount()); + fieldIdxMapping = fieldToInlinedKeysMapping(); if (!F.isEmpty(ectx.getQryTxEntries())) { InlineIndexRowHandler rowHnd = idx.segment(0).rowHandler(); @@ -131,7 +131,7 @@ public IndexScan( * @return Mapping from target row fields to inlined index keys, or {@code null} if inlined index keys * should not be used. */ - private int[] fieldToInlinedKeysMapping(int srcFieldsCnt) { + private int[] fieldToInlinedKeysMapping() { List inlinedKeys = idx.segment(0).rowHandler().inlineIndexKeyTypes(); // Since inline scan doesn't check expire time, allow it only if expired entries are eagerly removed. @@ -141,7 +141,7 @@ private int[] fieldToInlinedKeysMapping(int srcFieldsCnt) { // Even if we need some subset of inlined keys we are required to the read full inlined row, since this row // is also participated in comparison with other rows when cursor processing the next index page. if (inlinedKeys.size() < idx.segment(0).rowHandler().indexKeyDefinitions().size() || - inlinedKeys.size() < (requiredColumns == null ? srcFieldsCnt : requiredColumns.cardinality())) + inlinedKeys.size() < fieldColMapping.length) return null; for (InlineIndexKeyType keyType : inlinedKeys) { @@ -153,14 +153,11 @@ private int[] fieldToInlinedKeysMapping(int srcFieldsCnt) { return null; } - ImmutableBitSet reqCols = requiredColumns == null ? ImmutableBitSet.range(0, srcFieldsCnt) : - requiredColumns; - int[] fieldIdxMapping = new int[rowType.getFieldCount()]; - for (int i = 0, j = reqCols.nextSetBit(0); j != -1; j = reqCols.nextSetBit(j + 1), i++) { + for (int i = 0; i < fieldColMapping.length; i++) { // j = source field index, i = target field index. - int keyIdx = idxFieldMapping.indexOf(j); + int keyIdx = idxFieldMapping.indexOf(fieldColMapping[i]); if (keyIdx >= 0 && keyIdx < inlinedKeys.size()) fieldIdxMapping[i] = keyIdx; @@ -173,6 +170,11 @@ private int[] fieldToInlinedKeysMapping(int srcFieldsCnt) { /** {@inheritDoc} */ @Override protected Iterator createIterator() { + return F.iterator(createTableRowIterator(), this::indexRow2Row, true); + } + + /** {@inheritDoc} */ + @Override protected Iterator createTableRowIterator() { RangeIterable ranges0 = ranges == null ? null : new TransformRangeIterable<>(ranges, this::row2indexRow); TreeIndex treeIdx = treeIndex(); @@ -180,7 +182,20 @@ private int[] fieldToInlinedKeysMapping(int srcFieldsCnt) { if (!txChanges.changedKeysEmpty()) treeIdx = new TxAwareTreeIndexWrapper(treeIdx); - return F.iterator(new TreeIndexIterable<>(treeIdx, ranges0), this::indexRow2Row, true); + return new TreeIndexIterable<>(treeIdx, ranges0).iterator(); + } + + /** {@inheritDoc} */ + @Override public Row enrichRow(IndexRow idxRow, Row row, int[] fieldColMapping) { + try { + if (idxRow.indexPlainRow()) + return inlineIndexRow2Row(idxRow, row, fieldColMapping); + else + return desc.toRow(ectx, idxRow.cacheDataRow(), row, fieldColMapping); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } } /** */ @@ -221,28 +236,23 @@ protected IndexRow row2indexRow(Row bound) { } /** From IndexRow to Row convertor. */ - protected Row indexRow2Row(IndexRow row) { - try { - if (row.indexPlainRow()) - return inlineIndexRow2Row(row); - else - return desc.toRow(ectx, row.cacheDataRow(), factory, requiredColumns); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } + protected Row indexRow2Row(IndexRow idxRow) { + Row row = factory.create(); + + return enrichRow(idxRow, row, fieldColMapping); } /** */ - private Row inlineIndexRow2Row(IndexRow row) { + private Row inlineIndexRow2Row(IndexRow idxRow, Row row, int[] fieldColMapping) { RowHandler hnd = ectx.rowHandler(); - Row res = factory.create(); - - for (int i = 0; i < fieldIdxMapping.length; i++) - hnd.set(i, res, TypeUtils.toInternal(ectx, row.key(fieldIdxMapping[i]).key())); + for (int i = 0; i < fieldColMapping.length; i++) { + // Skip not required fields. + if (fieldColMapping[i] >= 0) + hnd.set(i, row, TypeUtils.toInternal(ectx, idxRow.key(fieldIdxMapping[i]).key())); + } - return res; + return row; } /** Query context. */ diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java index 9e05f5bf2c2235..70a392554a6dc6 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java @@ -64,6 +64,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.rel.ProjectNode; import org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanNode; import org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanStorageNode; +import org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanTableRowNode; import org.apache.ignite.internal.processors.query.calcite.exec.rel.SortAggregateNode; import org.apache.ignite.internal.processors.query.calcite.exec.rel.SortNode; import org.apache.ignite.internal.processors.query.calcite.exec.rel.TableSpoolNode; @@ -116,6 +117,7 @@ import org.apache.ignite.internal.processors.query.calcite.util.Commons; import org.apache.ignite.internal.processors.query.calcite.util.RexUtils; import org.apache.ignite.internal.util.typedef.F; +import org.jetbrains.annotations.Nullable; import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED; import static org.apache.ignite.internal.processors.query.calcite.util.TypeUtils.combinedRowType; @@ -368,10 +370,10 @@ private boolean hasExchange(RelNode rel) { ImmutableBitSet requiredColumns = rel.requiredColumns(); List searchBounds = rel.searchBounds(); - RelDataType rowType = tbl.getRowType(typeFactory, requiredColumns); + RelDataType inputRowType = tbl.getRowType(typeFactory, requiredColumns); - Predicate filters = condition == null ? null : expressionFactory.predicate(condition, rowType); - Function prj = projects == null ? null : expressionFactory.project(projects, rowType); + Predicate filters = condition == null ? null : expressionFactory.predicate(condition, inputRowType); + Function prj = projects == null ? null : expressionFactory.project(projects, inputRowType); RangeIterable ranges = searchBounds == null ? null : expressionFactory.ranges(searchBounds, rel.collation(), tbl.getRowType(typeFactory)); @@ -382,7 +384,8 @@ private boolean hasExchange(RelNode rel) { if (idx != null && !tbl.isIndexRebuildInProgress()) { Iterable rowsIter = idx.scan(ctx, grp, ranges, requiredColumns); - return new ScanStorageNode<>(tbl.name() + '.' + idx.name(), ctx, rowType, rowsIter, filters, prj); + return createStorageScan(tbl.name() + '.' + idx.name(), rel.getRowType(), inputRowType, + rowsIter, filters, prj, requiredColumns, rel.conditionColumns()); } else { // Index was invalidated after planning, workaround through table-scan -> sort -> index spool. @@ -402,12 +405,10 @@ private boolean hasExchange(RelNode rel) { requiredColumns ); - // If there are projects in the scan node - after the scan we already have target row type. - if (!spoolNodeRequired && projects != null) - rowType = rel.getRowType(); + RelDataType rowType = projNodeRequired ? rel.getRowType() : inputRowType; - Node node = new ScanStorageNode<>(tbl.name(), ctx, rowType, rowsIter, filterHasCorrelation ? null : filters, - projNodeRequired ? null : prj); + Node node = createStorageScan(tbl.name(), rowType, inputRowType, rowsIter, + filterHasCorrelation ? null : filters, projNodeRequired ? null : prj, requiredColumns, rel.conditionColumns()); RelCollation collation = rel.collation(); @@ -438,7 +439,7 @@ private boolean hasExchange(RelNode rel) { remappedSearchBounds.add(searchBounds.get(i)); // Collation and row type are already remapped taking into account requiredColumns. - ranges = expressionFactory.ranges(remappedSearchBounds, collation, rowType); + ranges = expressionFactory.ranges(remappedSearchBounds, collation, inputRowType); } IndexSpoolNode spoolNode = IndexSpoolNode.createTreeSpool( @@ -548,10 +549,10 @@ private boolean hasExchange(RelNode rel) { IgniteTable tbl = rel.getTable().unwrap(IgniteTable.class); IgniteTypeFactory typeFactory = ctx.getTypeFactory(); - RelDataType rowType = tbl.getRowType(typeFactory, requiredColumns); + RelDataType inputRowType = tbl.getRowType(typeFactory, requiredColumns); - Predicate filters = condition == null ? null : expressionFactory.predicate(condition, rowType); - Function prj = projects == null ? null : expressionFactory.project(projects, rowType); + Predicate filters = condition == null ? null : expressionFactory.predicate(condition, inputRowType); + Function prj = projects == null ? null : expressionFactory.project(projects, inputRowType); ColocationGroup grp = ctx.group(rel.sourceId()); @@ -560,12 +561,14 @@ private boolean hasExchange(RelNode rel) { if (idx != null && !tbl.isIndexRebuildInProgress()) { Iterable rowsIter = idx.scan(ctx, grp, null, requiredColumns); - return new ScanStorageNode<>(tbl.name() + '.' + idx.name(), ctx, rowType, rowsIter, filters, prj); + return createStorageScan(tbl.name() + '.' + idx.name(), rel.getRowType(), inputRowType, + rowsIter, filters, prj, requiredColumns, rel.conditionColumns()); } else { Iterable rowsIter = tbl.scan(ctx, grp, requiredColumns); - return new ScanStorageNode<>(tbl.name(), ctx, rowType, rowsIter, filters, prj); + return createStorageScan(tbl.name(), rel.getRowType(), inputRowType, rowsIter, filters, prj, + requiredColumns, rel.conditionColumns()); } } @@ -943,4 +946,46 @@ private Node visit(RelNode rel) { public > T go(IgniteRel rel) { return (T)visit(rel); } + + /** */ + private ScanStorageNode createStorageScan( + String storageName, + RelDataType outputRowType, + RelDataType inputRowType, + Iterable rowsIter, + @Nullable Predicate filter, + @Nullable Function rowTransformer, + @Nullable ImmutableBitSet requiredColumns, + @Nullable ImmutableBitSet filterColumns + ) { + int fieldsCnt = outputRowType.getFieldCount(); + + if (filter == null || filterColumns == null || filterColumns.cardinality() == fieldsCnt + || !(rowsIter instanceof TableRowIterable)) + return new ScanStorageNode<>(storageName, ctx, outputRowType, rowsIter, filter, rowTransformer); + + ImmutableBitSet reqCols = requiredColumns == null ? ImmutableBitSet.range(0, fieldsCnt) : requiredColumns; + + int[] filterColMapping = reqCols.toArray(); + int[] otherColMapping = filterColMapping.clone(); + + for (int i = 0; i < filterColMapping.length; i++) { + if (filterColumns.get(i)) + otherColMapping[i] = -1; + else + filterColMapping[i] = -1; + } + + return new ScanTableRowNode<>( + storageName, + ctx, + outputRowType, + inputRowType, + (TableRowIterable)rowsIter, + filter, + rowTransformer, + filterColMapping, + otherColMapping + ); + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/SystemViewScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/SystemViewScan.java index 76f71f5083799c..897b908b376310 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/SystemViewScan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/SystemViewScan.java @@ -47,8 +47,8 @@ public class SystemViewScan implements Iterable { /** */ private final RangeIterable ranges; - /** Participating colunms. */ - private final ImmutableBitSet requiredColumns; + /** Row field to view column mapping. */ + protected final int[] fieldColMapping; /** System view field names (for filtering). */ private final String[] filterableFieldNames; @@ -66,7 +66,6 @@ public SystemViewScan( this.ectx = ectx; this.desc = desc; this.ranges = ranges; - this.requiredColumns = requiredColumns; RelDataType rowType = desc.rowType(ectx.getTypeFactory(), requiredColumns); @@ -83,6 +82,11 @@ public SystemViewScan( } } } + + ImmutableBitSet reqCols = requiredColumns == null ? ImmutableBitSet.range(0, rowType.getFieldCount()) + : requiredColumns; + + fieldColMapping = reqCols.toArray(); } /** {@inheritDoc} */ @@ -123,6 +127,6 @@ public SystemViewScan( else viewIter = view.iterator(); - return F.iterator(viewIter, row -> desc.toRow(ectx, row, factory, requiredColumns), true); + return F.iterator(viewIter, row -> desc.toRow(ectx, row, factory.create(), fieldColMapping), true); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableRowIterable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableRowIterable.java new file mode 100644 index 00000000000000..e3cb2d23132fcf --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableRowIterable.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.query.calcite.exec; + +import java.util.Iterator; + +/** + * Interface to iterate over raw table data and convert this data to relational node rows. + * + * @param > Raw table row type. + * @param > Relational node row type. + */ +public interface TableRowIterable extends Iterable { + /** + * @return Table row iterator. + * */ + public Iterator tableRowIterator(); + + /** + * Enriches {@code nodeRow} with columns from {@code tableRow} * + * + * @param tableRow Table row. + * @param nodeRow Relational node row. + * @param fieldColMapping Mapping from node row fields to table row columns. If column is not requried + * corresponding field has -1 mapped value. + * @return Enriched relational node row. + */ + public Row enrichRow(TableRow tableRow, Row nodeRow, int[] fieldColMapping); +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java index c16252b902a0fa..f0d5f76b48d699 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java @@ -25,6 +25,7 @@ import java.util.function.Function; import org.apache.calcite.util.ImmutableBitSet; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow; @@ -37,7 +38,7 @@ import org.jetbrains.annotations.Nullable; /** */ -public class TableScan extends AbstractCacheColumnsScan { +public class TableScan extends AbstractCacheColumnsScan { /** */ public TableScan( ExecutionContext ectx, @@ -50,13 +51,29 @@ public TableScan( /** {@inheritDoc} */ @Override protected Iterator createIterator() { + return F.iterator((Iterator)new IteratorImpl(), + row -> enrichRow(row, factory.create(), fieldColMapping), true); + } + + /** {@inheritDoc} */ + @Override protected Iterator createTableRowIterator() { return new IteratorImpl(); } + /** */ + @Override public Row enrichRow(CacheDataRow cacheDataRow, Row row, int[] fieldColMapping) { + try { + return desc.toRow(ectx, cacheDataRow, row, fieldColMapping); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + /** * Table scan iterator. */ - private class IteratorImpl extends GridIteratorAdapter { + private class IteratorImpl extends GridIteratorAdapter { /** */ private final Queue parts; @@ -70,7 +87,7 @@ private class IteratorImpl extends GridIteratorAdapter { private Iterator txIter = Collections.emptyIterator(); /** */ - private Row next; + private CacheDataRow next; /** */ private IteratorImpl() { @@ -98,13 +115,13 @@ private IteratorImpl() { } /** {@inheritDoc} */ - @Override public Row nextX() throws IgniteCheckedException { + @Override public CacheDataRow nextX() throws IgniteCheckedException { advance(); if (next == null) throw new NoSuchElementException(); - Row next = this.next; + CacheDataRow next = this.next; this.next = null; @@ -154,7 +171,7 @@ private void advance() throws IgniteCheckedException { if (!desc.match(row)) continue; - next = desc.toRow(ectx, row, factory, requiredColumns); + next = row; break; } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java index ca6469b09e6077..82ec7b5c6886da 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java @@ -34,13 +34,13 @@ public class ScanNode extends AbstractNode implements SingleNode private final Iterable src; /** */ - @Nullable private final Predicate filter; + @Nullable protected final Predicate filter; /** */ - @Nullable private final Function rowTransformer; + @Nullable protected final Function rowTransformer; /** */ - private Iterator it; + protected Iterator it; /** */ private int requested; @@ -146,25 +146,43 @@ private void push() throws Exception { } } + /** + * @return Next row, or {@code null} if row was filtered out. + */ + protected Row processNextRow() { + Row r = (Row)it.next(); + + if (filter == null || filter.test(r)) { + if (rowTransformer != null) + r = rowTransformer.apply(r); + + return r; + } + + return null; + } + + /** */ + protected Iterator sourceIterator() { + return src.iterator(); + } + /** * @return Count of processed rows. */ protected int processNextBatch() throws Exception { if (it == null) - it = src.iterator(); + it = sourceIterator(); int processed = 0; while (requested > 0 && it.hasNext()) { checkState(); - Row r = it.next(); + Row r = processNextRow(); - if (filter == null || filter.test(r)) { + if (r != null) { requested--; - if (rowTransformer != null) - r = rowTransformer.apply(r); - downstream().push(r); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanTableRowNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanTableRowNode.java new file mode 100644 index 00000000000000..a5f76912e8921c --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanTableRowNode.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.rel; + +import java.util.Iterator; +import java.util.function.Function; +import java.util.function.Predicate; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; +import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; +import org.apache.ignite.internal.processors.query.calcite.exec.TableRowIterable; +import org.jetbrains.annotations.Nullable; + +/** + * Scan table rows node. + */ +public class ScanTableRowNode extends ScanStorageNode { + /** */ + private final TableRowIterable src; + + /** */ + private final int[] filterColMapping; + + /** */ + private final int[] otherColMapping; + + /** */ + private final RowHandler.RowFactory factory; + + /** */ + private Row curRow; + + /** + * @param storageName Storage (index or table) name. + * @param ctx Execution context. + * @param outputRowType Output row type. + * @param inputRowType Input row type. + * @param src Source. + * @param filter Row filter. + * @param rowTransformer Row transformer (projection). + * @param filterColMapping Fields to columns mapping for fields used in filter. + * @param otherColMapping Fields to columns mapping for other fields. + */ + @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") + public ScanTableRowNode( + String storageName, + ExecutionContext ctx, + RelDataType outputRowType, + RelDataType inputRowType, + TableRowIterable src, + Predicate filter, + @Nullable Function rowTransformer, + int[] filterColMapping, + int[] otherColMapping + ) { + super(storageName, ctx, outputRowType, src, filter, rowTransformer); + + assert filter != null; + + factory = ctx.rowHandler().factory(ctx.getTypeFactory(), inputRowType); + + this.src = src; + this.filterColMapping = filterColMapping; + this.otherColMapping = otherColMapping; + } + + /** {@inheritDoc} */ + @Override protected Iterator sourceIterator() { + return src.tableRowIterator(); + } + + /** {@inheritDoc} */ + @Override protected Row processNextRow() { + Row row = curRow == null ? factory.create() : curRow; + + TableRow tableRow = (TableRow)it.next(); + + src.enrichRow(tableRow, row, filterColMapping); + + if (filter.test(row)) { + src.enrichRow(tableRow, row, otherColMapping); + + if (rowTransformer != null) + row = rowTransformer.apply(row); + + curRow = null; + + return row; + } + + return null; + } + + /** */ + @Override public void closeInternal() { + super.closeInternal(); + + curRow = null; + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/ProjectableFilterableTableScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/ProjectableFilterableTableScan.java index 5508ee6f69f621..51cca0019d22ab 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/ProjectableFilterableTableScan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/ProjectableFilterableTableScan.java @@ -63,6 +63,9 @@ public abstract class ProjectableFilterableTableScan extends TableScan { /** Participating columns. */ protected final ImmutableBitSet requiredColumns; + /** Columns used by condition. */ + protected ImmutableBitSet conditionColumns; + /** */ protected ProjectableFilterableTableScan( RelOptCluster cluster, @@ -195,4 +198,25 @@ public RelColumnOrigin columnOriginsByRelLocalRef(int colIdx) { return new RelColumnOrigin(getTable(), originColIdx, false); } + + /** */ + public @Nullable ImmutableBitSet conditionColumns() { + if (condition == null) + return null; + + if (conditionColumns == null) { + ImmutableBitSet.Builder builder = ImmutableBitSet.builder(); + + new RexShuttle() { + @Override public RexNode visitLocalRef(RexLocalRef inputRef) { + builder.set(inputRef.getIndex()); + return inputRef; + } + }.apply(condition); + + conditionColumns = builder.build(); + } + + return conditionColumns; + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java index d60eb448ba6770..00ff3d92bdcb0b 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java @@ -70,7 +70,6 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; @@ -248,36 +247,28 @@ else if (affFields.isEmpty()) /** {@inheritDoc} */ @Override public Row toRow( ExecutionContext ectx, - CacheDataRow row, - RowHandler.RowFactory factory, - @Nullable ImmutableBitSet requiredColumns + CacheDataRow tableRow, + Row row, + int[] fieldColMapping ) throws IgniteCheckedException { - RowHandler hnd = factory.handler(); + RowHandler hnd = ectx.rowHandler(); - assert hnd == ectx.rowHandler(); + assert hnd.columnCount(row) == fieldColMapping.length : + "Unexpected row column count: " + hnd.columnCount(row) + " expected: " + fieldColMapping.length; - Row res = factory.create(); + for (int i = 0; i < fieldColMapping.length; i++) { + int colIdx = fieldColMapping[i]; - assert hnd.columnCount(res) == (requiredColumns == null ? descriptors.length : requiredColumns.cardinality()); + // Skip not required fields. + if (colIdx < 0) + continue; - if (requiredColumns == null) { - for (int i = 0; i < descriptors.length; i++) { - CacheColumnDescriptor desc = descriptors[i]; + CacheColumnDescriptor desc = descriptors[colIdx]; - hnd.set(i, res, TypeUtils.toInternal(ectx, - desc.value(ectx, cacheContext(), row), desc.storageType())); - } - } - else { - for (int i = 0, j = requiredColumns.nextSetBit(0); j != -1; j = requiredColumns.nextSetBit(j + 1), i++) { - CacheColumnDescriptor desc = descriptors[j]; - - hnd.set(i, res, TypeUtils.toInternal(ectx, - desc.value(ectx, cacheContext(), row), desc.storageType())); - } + hnd.set(i, row, TypeUtils.toInternal(ectx, desc.value(ectx, cacheContext(), tableRow), desc.storageType())); } - return res; + return row; } /** {@inheritDoc} */ diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableDescriptorImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableDescriptorImpl.java index 80d7f95b3379df..ab89ce303fb42d 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableDescriptorImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableDescriptorImpl.java @@ -40,7 +40,6 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.systemview.view.SystemView; import org.apache.ignite.spi.systemview.view.SystemViewRowAttributeWalker; -import org.jetbrains.annotations.Nullable; import static org.apache.calcite.rel.type.RelDataType.PRECISION_NOT_SPECIFIED; import static org.apache.calcite.rel.type.RelDataType.SCALE_NOT_SPECIFIED; @@ -107,24 +106,28 @@ public String name() { /** {@inheritDoc} */ @Override public Row toRow( ExecutionContext ectx, - ViewRow row, - RowHandler.RowFactory factory, - @Nullable ImmutableBitSet requiredColumns + ViewRow viewRow, + Row row, + int[] fieldColMapping ) { - RowHandler hnd = factory.handler(); + RowHandler hnd = ectx.rowHandler(); - assert hnd == ectx.rowHandler(); + assert hnd.columnCount(row) == fieldColMapping.length : + "Unexpected row column count: " + hnd.columnCount(row) + " expected: " + fieldColMapping.length; - Row res = factory.create(); - - assert hnd.columnCount(res) == (requiredColumns == null ? descriptors.length : requiredColumns.cardinality()); - - sysView.walker().visitAll(row, new SystemViewRowAttributeWalker.AttributeWithValueVisitor() { + sysView.walker().visitAll(viewRow, new SystemViewRowAttributeWalker.AttributeWithValueVisitor() { private int colIdx; @Override public void accept(int idx, String name, Class clazz, T val) { - if (requiredColumns == null || requiredColumns.get(idx)) - hnd.set(colIdx++, res, TypeUtils.toInternal(ectx, val, descriptors[idx].storageType())); + // Assume fieldColMapping derived from required columns and sorted. + // Assume 'accept' is called with increasing 'idx'. + + // Skip not required fields. + while (colIdx < fieldColMapping.length && fieldColMapping[colIdx] < 0) + colIdx++; + + if (colIdx < fieldColMapping.length && fieldColMapping[colIdx] == idx) + hnd.set(colIdx++, row, TypeUtils.toInternal(ectx, val, descriptors[idx].storageType())); } @Override public void acceptBoolean(int idx, String name, boolean val) { @@ -160,7 +163,7 @@ public String name() { } }); - return res; + return row; } /** {@inheritDoc} */ diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java index af3b72a382b9aa..4c2b82f548ae1a 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptor.java @@ -26,12 +26,10 @@ import org.apache.calcite.util.ImmutableBitSet; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; -import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup; import org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext; import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution; import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory; -import org.jetbrains.annotations.Nullable; /** * @@ -94,19 +92,20 @@ default RelDataType selectForUpdateRowType(IgniteTypeFactory factory) { boolean isUpdateAllowed(RelOptTable tbl, int colIdx); /** - * Converts a cache row to relational node row. + * Converts a table row to relational node row. * * @param ectx Execution context. - * @param row Cache row. - * @param requiredColumns Participating columns. + * @param tableRow Table row. + * @param row Relational node row. + * @param fieldColMapping Mapping from row fields to table columns. * @return Relational node row. * @throws IgniteCheckedException If failed. */ Row toRow( ExecutionContext ectx, - TableRow row, - RowHandler.RowFactory factory, - @Nullable ImmutableBitSet requiredColumns + TableRow tableRow, + Row row, + int[] fieldColMapping ) throws IgniteCheckedException; /** diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java index 4dd43a9c80755f..512f23f68b6082 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java @@ -66,7 +66,6 @@ import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor; import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; -import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; import org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor; import org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonReader; import org.apache.ignite.internal.processors.query.calcite.message.CalciteMessage; @@ -809,8 +808,7 @@ public TestTableDescriptor(Supplier distribution, RelDataTyp } /** {@inheritDoc} */ - @Override public Row toRow(ExecutionContext ectx, CacheDataRow row, RowHandler.RowFactory factory, - @Nullable ImmutableBitSet requiredColumns) throws IgniteCheckedException { + @Override public Row toRow(ExecutionContext ectx, CacheDataRow row, Row row2, int[] fieldColMapping) { throw new AssertionError(); } From 5913dbbb6d9220311c28acae0522729d467e2962 Mon Sep 17 00:00:00 2001 From: Aleksey Plekhanov Date: Mon, 12 Jan 2026 11:23:13 +0500 Subject: [PATCH 2/5] IGNITE-27431 Review comments fixed --- .../query/calcite/exec/AbstractCacheColumnsScan.java | 5 +++-- .../internal/processors/query/calcite/exec/IndexScan.java | 2 +- .../processors/query/calcite/exec/TableRowIterable.java | 8 ++++---- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheColumnsScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheColumnsScan.java index b77178ccfb4837..95a0f5aa47ba63 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheColumnsScan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheColumnsScan.java @@ -36,7 +36,7 @@ public abstract class AbstractCacheColumnsScan extends AbstractCa /** */ protected final RelDataType rowType; - /** Row field to column mapping. */ + /** Output node row fields to input table row columns mapping. */ protected final int[] fieldColMapping; /** */ @@ -53,7 +53,8 @@ public abstract class AbstractCacheColumnsScan extends AbstractCa rowType = desc.rowType(ectx.getTypeFactory(), requiredColumns); factory = ectx.rowHandler().factory(ectx.getTypeFactory(), rowType); - ImmutableBitSet reqCols = requiredColumns == null ? ImmutableBitSet.range(0, rowType.getFieldCount()) + ImmutableBitSet reqCols = requiredColumns == null + ? ImmutableBitSet.range(0, rowType.getFieldCount()) : requiredColumns; fieldColMapping = reqCols.toArray(); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java index 22bfbe821c0b7a..38034bc93caa05 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java @@ -156,7 +156,7 @@ private int[] fieldToInlinedKeysMapping() { int[] fieldIdxMapping = new int[rowType.getFieldCount()]; for (int i = 0; i < fieldColMapping.length; i++) { - // j = source field index, i = target field index. + // i = output row field index, fieldColMapping[i] = table row column index. int keyIdx = idxFieldMapping.indexOf(fieldColMapping[i]); if (keyIdx >= 0 && keyIdx < inlinedKeys.size()) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableRowIterable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableRowIterable.java index e3cb2d23132fcf..63e95fcd3a28fc 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableRowIterable.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableRowIterable.java @@ -21,17 +21,17 @@ /** * Interface to iterate over raw table data and convert this data to relational node rows. * - * @param > Raw table row type. - * @param > Relational node row type. + * @param Raw table row type. + * @param Relational node row type. */ public interface TableRowIterable extends Iterable { /** * @return Table row iterator. - * */ + */ public Iterator tableRowIterator(); /** - * Enriches {@code nodeRow} with columns from {@code tableRow} * + * Enriches {@code nodeRow} with columns from {@code tableRow}. * * @param tableRow Table row. * @param nodeRow Relational node row. From 6b4ec6b76e65df10ca15f8b92acd1441b6192511 Mon Sep 17 00:00:00 2001 From: Aleksey Plekhanov Date: Mon, 12 Jan 2026 11:42:55 +0500 Subject: [PATCH 3/5] IGNITE-27431 Fix after merge --- .../processors/query/calcite/planner/TestTable.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java index 744c351ddf6859..a4e5ab611e87f2 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java @@ -61,7 +61,6 @@ import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; -import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup; import org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext; import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan; @@ -411,8 +410,12 @@ public TestTableDescriptor(Supplier distribution, RelDataTyp } /** {@inheritDoc} */ - @Override public Row toRow(ExecutionContext ectx, CacheDataRow row, RowHandler.RowFactory factory, - @Nullable ImmutableBitSet requiredColumns) { + @Override public Row toRow( + ExecutionContext ectx, + CacheDataRow cacheDataRow, + Row outputRow, + int[] fieldColMapping + ) { throw new AssertionError(); } From 08fcefaf4bcddd87658bbfbbb61675e1acdb1ee1 Mon Sep 17 00:00:00 2001 From: Aleksey Plekhanov Date: Mon, 19 Jan 2026 21:09:03 +0300 Subject: [PATCH 4/5] IGNITE-27431 Review comments fixed --- .../processors/query/calcite/exec/TableRowIterable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableRowIterable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableRowIterable.java index 63e95fcd3a28fc..3523419c2048eb 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableRowIterable.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableRowIterable.java @@ -19,7 +19,7 @@ import java.util.Iterator; /** - * Interface to iterate over raw table data and convert this data to relational node rows. + * Interface to iterate over raw table data and form relational node rows from table row columns. * * @param Raw table row type. * @param Relational node row type. From da6cefca4673783fa722b7f6faacdc868712033f Mon Sep 17 00:00:00 2001 From: Aleksey Plekhanov Date: Tue, 20 Jan 2026 14:15:19 +0300 Subject: [PATCH 5/5] IGNITE-27431 Review comments fixed --- .../calcite/exec/LogicalRelImplementor.java | 2 +- .../query/calcite/exec/rel/ScanNode.java | 2 +- .../calcite/exec/rel/ScanTableRowNode.java | 5 +- .../exec/LogicalRelImplementorTest.java | 80 ++++++++++++++ .../exec/rel/ScanTableRowExecutionTest.java | 104 ++++++++++++++++++ .../ignite/testsuites/ExecutionTestSuite.java | 4 +- 6 files changed, 192 insertions(+), 5 deletions(-) create mode 100644 modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanTableRowExecutionTest.java diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java index bc82a3e024e57d..d816ed65510887 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java @@ -957,7 +957,7 @@ private ScanStorageNode createStorageScan( @Nullable ImmutableBitSet requiredColumns, @Nullable ImmutableBitSet filterColumns ) { - int fieldsCnt = outputRowType.getFieldCount(); + int fieldsCnt = inputRowType.getFieldCount(); if (filter == null || filterColumns == null || filterColumns.cardinality() == fieldsCnt || !(rowsIter instanceof TableRowIterable)) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java index 82ec7b5c6886da..80201b26466725 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanNode.java @@ -149,7 +149,7 @@ private void push() throws Exception { /** * @return Next row, or {@code null} if row was filtered out. */ - protected Row processNextRow() { + protected @Nullable Row processNextRow() { Row r = (Row)it.next(); if (filter == null || filter.test(r)) { diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanTableRowNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanTableRowNode.java index a5f76912e8921c..0405292f32f169 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanTableRowNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanTableRowNode.java @@ -27,7 +27,8 @@ import org.jetbrains.annotations.Nullable; /** - * Scan table rows node. + * Scan table rows node. With ability to request part of table row columns for filtering + * ({@code ScanStorageNode} can only request full set of requred table row columns). */ public class ScanTableRowNode extends ScanStorageNode { /** */ @@ -85,7 +86,7 @@ public ScanTableRowNode( } /** {@inheritDoc} */ - @Override protected Row processNextRow() { + @Override protected @Nullable Row processNextRow() { Row row = curRow == null ? factory.create() : curRow; TableRow tableRow = (TableRow)it.next(); diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementorTest.java index 1afaf7005527ee..3768cb9727271d 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementorTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementorTest.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.UUID; import java.util.function.Predicate; @@ -37,11 +38,13 @@ import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.mapping.Mappings; import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.RangeIterable; import org.apache.ignite.internal.processors.query.calcite.exec.rel.CollectNode; import org.apache.ignite.internal.processors.query.calcite.exec.rel.IndexSpoolNode; import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node; import org.apache.ignite.internal.processors.query.calcite.exec.rel.ProjectNode; import org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanNode; +import org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanTableRowNode; import org.apache.ignite.internal.processors.query.calcite.exec.rel.SortNode; import org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpIoTracker; import org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker; @@ -51,6 +54,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexBound; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexCount; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan; +import org.apache.ignite.internal.processors.query.calcite.schema.CacheIndexImpl; import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema; import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils; import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory; @@ -59,6 +63,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; import org.junit.Test; import static org.apache.calcite.tools.Frameworks.createRootSchema; @@ -375,6 +380,51 @@ public void testIndexScanRewriter() { checkNodesChain(relImplementor, scan, isProj, isSpool, isSort, isScanNoFilterNoProject); } + /** */ + @Test + public void testScanTableRow() { + RelCollation idxCollation = TraitUtils.createCollation(Collections.singletonList(2)); + tbl.addIndex(new ScannableTestIndex(idxCollation, QueryUtils.PRIMARY_KEY_INDEX, tbl)); + + RelDataType rowType = tbl.getRowType(tf); + RelDataType sqlTypeInt = rowType.getFieldList().get(2).getType(); + RelDataType sqlTypeVarchar = rowType.getFieldList().get(3).getType(); + + List project = F.asList(rexBuilder.makeLocalRef(sqlTypeVarchar, 1)); + + RexNode filterOneField = rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, + rexBuilder.makeLocalRef(sqlTypeInt, 0), + rexBuilder.makeLiteral(1, sqlTypeInt) + ); + + RexNode filterTwoFields = rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, + rexBuilder.makeCast(sqlTypeVarchar, rexBuilder.makeLocalRef(sqlTypeInt, 0)), + rexBuilder.makeLocalRef(sqlTypeVarchar, 1) + ); + + ImmutableBitSet requiredColumns = ImmutableBitSet.of(2, 3); + + IgniteIndexScan scan = new IgniteIndexScan( + cluster, + cluster.traitSet(), + qctx.catalogReader().getTable(F.asList("PUBLIC", "TBL")), + QueryUtils.PRIMARY_KEY_INDEX, + project, + filterOneField, + RexUtils.buildSortedSearchBounds(cluster, idxCollation, filterOneField, rowType, requiredColumns), + requiredColumns, + idxCollation + ); + + // Not all fields participating in filter, it worth to create ScanTableRowNode. + checkNodesChain(relImplementor, scan, node -> node instanceof ScanTableRowNode); + + scan = createScan(scan, idxCollation, project, filterTwoFields, requiredColumns); + + // All fields participating in filter, regular ScanStorageNode should be created. + checkNodesChain(relImplementor, scan, node -> !(node instanceof ScanTableRowNode)); + } + /** */ private IgniteIndexScan createScan( IgniteIndexScan templateScan, @@ -445,4 +495,34 @@ public ScannableTestTable(RelDataType type) { return Collections.emptyList(); } } + + /** */ + private static class ScannableTestIndex extends CacheIndexImpl { + /** */ + public ScannableTestIndex(RelCollation collation, String name, TestTable tbl) { + super(collation, name, null, tbl); + } + + /** {@inheritDoc} */ + @Override public Iterable scan( + ExecutionContext execCtx, + ColocationGroup grp, + RangeIterable ranges, + @Nullable ImmutableBitSet requiredColumns + ) { + return new TableRowIterable<>() { + @Override public Iterator iterator() { + return Collections.emptyIterator(); + } + + @Override public Iterator tableRowIterator() { + return Collections.emptyIterator(); + } + + @Override public Row enrichRow(Object o, Row nodeRow, int[] fieldColMapping) { + return null; + } + }; + } + } } diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanTableRowExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanTableRowExecutionTest.java new file mode 100644 index 00000000000000..f88e46d1a3c5e8 --- /dev/null +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanTableRowExecutionTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.rel; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.UUID; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; +import org.apache.ignite.internal.processors.query.calcite.exec.TableRowIterable; +import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory; +import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils; +import org.apache.ignite.internal.util.typedef.F; +import org.junit.Test; + +/** + * Execution test for ScanTableRowNode. + */ +public class ScanTableRowExecutionTest extends AbstractExecutionTest { + /** */ + @Test + public void testScanTableRow() { + ExecutionContext ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0); + IgniteTypeFactory tf = ctx.getTypeFactory(); + RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class, Boolean.class); + + List data = F.asList( + new Object[] {0, "0", true}, + new Object[] {1, "1", false}, + new Object[] {2, "2", true}, + new Object[] {3, "3", false}, + new Object[] {4, "4", true} + ); + + TableRowIterable it = new TableRowIterable<>() { + @Override public Iterator tableRowIterator() { + return data.iterator(); + } + + @Override public Object[] enrichRow(Object[] tableRow, Object[] nodeRow, int[] fieldColMapping) { + for (int i = 0; i < fieldColMapping.length; i++) { + if (fieldColMapping[i] >= 0) + nodeRow[i] = tableRow[fieldColMapping[i]]; + } + + // Due to test invariant we can't return row enriched with FALSE in the field 2. + assertNotSame(Boolean.FALSE, nodeRow[2]); + + return nodeRow; + } + + @Override public Iterator iterator() { + throw new AssertionError("Unexpected call"); + } + }; + + ScanTableRowNode scan = new ScanTableRowNode<>( + "test", + ctx, + rowType, // Output row type. + rowType, // Input row type. + it, // Iterator. + r -> ((int)r[0] & 1) == 0, // Filter. + r -> new Object[] {((int)r[0]) * 2, r[1], r[2]}, // Project. + new int[] {0, -1, -1}, // Filter columns mapping. + new int[] {-1, 1, 2} // Other columns mapping. + ); + + RootNode root = new RootNode<>(ctx, rowType); + root.register(scan); + + List res = new ArrayList<>(); + + while (root.hasNext()) + res.add(root.next()); + + List exp = F.asList( + new Object[] {0, "0", true}, + new Object[] {4, "2", true}, + new Object[] {8, "4", true} + ); + + assertEquals(exp.size(), res.size()); + + for (int i = 0; i < exp.size(); i++) + assertEqualsArraysAware(exp.get(i), res.get(i)); + } +} diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java index bd6f6eb2258c37..c8fbb9d90b11d8 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java +++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.rel.MergeJoinExecutionTest; import org.apache.ignite.internal.processors.query.calcite.exec.rel.MinusExecutionTest; import org.apache.ignite.internal.processors.query.calcite.exec.rel.NestedLoopJoinExecutionTest; +import org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanTableRowExecutionTest; import org.apache.ignite.internal.processors.query.calcite.exec.rel.SortAggregateExecutionTest; import org.apache.ignite.internal.processors.query.calcite.exec.rel.SortedIndexSpoolExecutionTest; import org.apache.ignite.internal.processors.query.calcite.exec.rel.TableSpoolExecutionTest; @@ -38,7 +39,7 @@ import org.junit.runners.Suite; /** - * Calcite tests. + * Calcite execution tests. */ @RunWith(Suite.class) @Suite.SuiteClasses({ @@ -59,6 +60,7 @@ LimitExecutionTest.class, TimeCalculationExecutionTest.class, UncollectExecutionTest.class, + ScanTableRowExecutionTest.class, }) public class ExecutionTestSuite { }