Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@

package org.apache.ignite.internal.processors.query.calcite.exec;

import java.util.Iterator;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDescriptor;
import org.jetbrains.annotations.Nullable;

/** */
public abstract class AbstractCacheColumnsScan<Row> extends AbstractCacheScan<Row> {
public abstract class AbstractCacheColumnsScan<TableRow, Row> extends AbstractCacheScan<Row>
implements TableRowIterable<TableRow, Row> {
/** */
protected final CacheTableDescriptor desc;

Expand All @@ -34,8 +36,8 @@ public abstract class AbstractCacheColumnsScan<Row> extends AbstractCacheScan<Ro
/** */
protected final RelDataType rowType;

/** Participating columns. */
protected final ImmutableBitSet requiredColumns;
/** Output node row fields to input table row columns mapping. */
protected final int[] fieldColMapping;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: it looks bit confusing. Let's rename pameters and fields like this everywhere to something like:
rawToRelColMapping and/or relToRawColMapping

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we have mapping from node row fields to table row columns. We have identical naming in IndexScan: idxFieldMapping - from index keys to row fields, fieldIdxMapping - from row fields to index keys.
Maybe just extend the javadoc?


/** */
AbstractCacheColumnsScan(
Expand All @@ -47,9 +49,31 @@ public abstract class AbstractCacheColumnsScan<Row> extends AbstractCacheScan<Ro
super(ectx, desc.cacheContext(), parts);

this.desc = desc;
this.requiredColumns = requiredColumns;

rowType = desc.rowType(ectx.getTypeFactory(), requiredColumns);
factory = ectx.rowHandler().factory(ectx.getTypeFactory(), rowType);

ImmutableBitSet reqCols = requiredColumns == null
? ImmutableBitSet.range(0, rowType.getFieldCount())
: requiredColumns;

fieldColMapping = reqCols.toArray();
}

/** {@inheritDoc} */
@Override public final Iterator<TableRow> tableRowIterator() {
reserve();

try {
return createTableRowIterator();
}
catch (Exception e) {
release();

throw e;
}
}

/** Table row iterator.*/
protected abstract Iterator<TableRow> createTableRowIterator();
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public abstract class AbstractCacheScan<Row> implements Iterable<Row>, AutoClose
}

/** */
private synchronized void reserve() {
protected synchronized void reserve() {
if (reservation != null)
return;

Expand Down Expand Up @@ -161,7 +161,7 @@ protected void processReservedTopology(GridDhtPartitionTopology top) {
}

/** */
private synchronized void release() {
protected synchronized void release() {
if (reservation != null)
reservation.release();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
/**
* Scan on index.
*/
public class IndexScan<Row> extends AbstractCacheColumnsScan<Row> {
public class IndexScan<Row> extends AbstractCacheColumnsScan<IndexRow, Row> {
/** */
private final GridKernalContext kctx;

Expand Down Expand Up @@ -109,7 +109,7 @@ public IndexScan(
for (int i = 0; i < srcRowType.getFieldCount(); i++)
fieldsStoreTypes[i] = typeFactory.getResultClass(srcRowType.getFieldList().get(i).getType());

fieldIdxMapping = fieldToInlinedKeysMapping(srcRowType.getFieldCount());
fieldIdxMapping = fieldToInlinedKeysMapping();

if (!F.isEmpty(ectx.getQryTxEntries())) {
InlineIndexRowHandler rowHnd = idx.segment(0).rowHandler();
Expand All @@ -131,7 +131,7 @@ public IndexScan(
* @return Mapping from target row fields to inlined index keys, or {@code null} if inlined index keys
* should not be used.
*/
private int[] fieldToInlinedKeysMapping(int srcFieldsCnt) {
private int[] fieldToInlinedKeysMapping() {
List<InlineIndexKeyType> inlinedKeys = idx.segment(0).rowHandler().inlineIndexKeyTypes();

// Since inline scan doesn't check expire time, allow it only if expired entries are eagerly removed.
Expand All @@ -141,7 +141,7 @@ private int[] fieldToInlinedKeysMapping(int srcFieldsCnt) {
// Even if we need some subset of inlined keys we are required to the read full inlined row, since this row
// is also participated in comparison with other rows when cursor processing the next index page.
if (inlinedKeys.size() < idx.segment(0).rowHandler().indexKeyDefinitions().size() ||
inlinedKeys.size() < (requiredColumns == null ? srcFieldsCnt : requiredColumns.cardinality()))
inlinedKeys.size() < fieldColMapping.length)
return null;

for (InlineIndexKeyType keyType : inlinedKeys) {
Expand All @@ -153,14 +153,11 @@ private int[] fieldToInlinedKeysMapping(int srcFieldsCnt) {
return null;
}

ImmutableBitSet reqCols = requiredColumns == null ? ImmutableBitSet.range(0, srcFieldsCnt) :
requiredColumns;

int[] fieldIdxMapping = new int[rowType.getFieldCount()];

for (int i = 0, j = reqCols.nextSetBit(0); j != -1; j = reqCols.nextSetBit(j + 1), i++) {
// j = source field index, i = target field index.
int keyIdx = idxFieldMapping.indexOf(j);
for (int i = 0; i < fieldColMapping.length; i++) {
// i = output row field index, fieldColMapping[i] = table row column index.
int keyIdx = idxFieldMapping.indexOf(fieldColMapping[i]);

if (keyIdx >= 0 && keyIdx < inlinedKeys.size())
fieldIdxMapping[i] = keyIdx;
Expand All @@ -173,14 +170,32 @@ private int[] fieldToInlinedKeysMapping(int srcFieldsCnt) {

/** {@inheritDoc} */
@Override protected Iterator<Row> createIterator() {
return F.iterator(createTableRowIterator(), this::indexRow2Row, true);
}

/** {@inheritDoc} */
@Override protected Iterator<IndexRow> createTableRowIterator() {
RangeIterable<IndexRow> ranges0 = ranges == null ? null : new TransformRangeIterable<>(ranges, this::row2indexRow);

TreeIndex<IndexRow> treeIdx = treeIndex();

if (!txChanges.changedKeysEmpty())
treeIdx = new TxAwareTreeIndexWrapper(treeIdx);

return F.iterator(new TreeIndexIterable<>(treeIdx, ranges0), this::indexRow2Row, true);
return new TreeIndexIterable<>(treeIdx, ranges0).iterator();
}

/** {@inheritDoc} */
@Override public Row enrichRow(IndexRow idxRow, Row row, int[] fieldColMapping) {
try {
if (idxRow.indexPlainRow())
return inlineIndexRow2Row(idxRow, row, fieldColMapping);
else
return desc.toRow(ectx, idxRow.cacheDataRow(), row, fieldColMapping);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}

/** */
Expand Down Expand Up @@ -221,28 +236,23 @@ protected IndexRow row2indexRow(Row bound) {
}

/** From IndexRow to Row convertor. */
protected Row indexRow2Row(IndexRow row) {
try {
if (row.indexPlainRow())
return inlineIndexRow2Row(row);
else
return desc.toRow(ectx, row.cacheDataRow(), factory, requiredColumns);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
protected Row indexRow2Row(IndexRow idxRow) {
Row row = factory.create();

return enrichRow(idxRow, row, fieldColMapping);
}

/** */
private Row inlineIndexRow2Row(IndexRow row) {
private Row inlineIndexRow2Row(IndexRow idxRow, Row row, int[] fieldColMapping) {
RowHandler<Row> hnd = ectx.rowHandler();

Row res = factory.create();

for (int i = 0; i < fieldIdxMapping.length; i++)
hnd.set(i, res, TypeUtils.toInternal(ectx, row.key(fieldIdxMapping[i]).key()));
for (int i = 0; i < fieldColMapping.length; i++) {
// Skip not required fields.
if (fieldColMapping[i] >= 0)
hnd.set(i, row, TypeUtils.toInternal(ectx, idxRow.key(fieldIdxMapping[i]).key()));
}

return res;
return row;
}

/** Query context. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.ignite.internal.processors.query.calcite.exec.rel.ProjectNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanStorageNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanTableRowNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.SortAggregateNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.SortNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.TableSpoolNode;
Expand Down Expand Up @@ -116,6 +117,7 @@
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.Nullable;

import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
import static org.apache.ignite.internal.processors.query.calcite.util.TypeUtils.combinedRowType;
Expand Down Expand Up @@ -368,10 +370,10 @@ private boolean hasExchange(RelNode rel) {
ImmutableBitSet requiredColumns = rel.requiredColumns();
List<SearchBounds> searchBounds = rel.searchBounds();

RelDataType rowType = rel.getDataSourceRowType();
RelDataType inputRowType = rel.getDataSourceRowType();

Predicate<Row> filters = condition == null ? null : expressionFactory.predicate(condition, rowType);
Function<Row, Row> prj = projects == null ? null : expressionFactory.project(projects, rowType);
Predicate<Row> filters = condition == null ? null : expressionFactory.predicate(condition, inputRowType);
Function<Row, Row> prj = projects == null ? null : expressionFactory.project(projects, inputRowType);
RangeIterable<Row> ranges = searchBounds == null ? null :
expressionFactory.ranges(searchBounds, rel.collation(), tbl.getRowType(typeFactory));

Expand All @@ -382,7 +384,8 @@ private boolean hasExchange(RelNode rel) {
if (idx != null && !tbl.isIndexRebuildInProgress()) {
Iterable<Row> rowsIter = idx.scan(ctx, grp, ranges, requiredColumns);

return new ScanStorageNode<>(tbl.name() + '.' + idx.name(), ctx, rowType, rowsIter, filters, prj);
return createStorageScan(tbl.name() + '.' + idx.name(), rel.getRowType(), inputRowType,
rowsIter, filters, prj, requiredColumns, rel.conditionColumns());
}
else {
// Index was invalidated after planning, workaround through table-scan -> sort -> index spool.
Expand All @@ -402,12 +405,10 @@ private boolean hasExchange(RelNode rel) {
requiredColumns
);

// If there are projects in the scan node - after the scan we already have target row type.
if (!spoolNodeRequired && projects != null)
rowType = rel.getRowType();
RelDataType rowType = projNodeRequired ? rel.getRowType() : inputRowType;

Node<Row> node = new ScanStorageNode<>(tbl.name(), ctx, rowType, rowsIter, filterHasCorrelation ? null : filters,
projNodeRequired ? null : prj);
Node<Row> node = createStorageScan(tbl.name(), rowType, inputRowType, rowsIter,
filterHasCorrelation ? null : filters, projNodeRequired ? null : prj, requiredColumns, rel.conditionColumns());

RelCollation collation = rel.collation();

Expand Down Expand Up @@ -438,7 +439,7 @@ private boolean hasExchange(RelNode rel) {
remappedSearchBounds.add(searchBounds.get(i));

// Collation and row type are already remapped taking into account requiredColumns.
ranges = expressionFactory.ranges(remappedSearchBounds, collation, rowType);
ranges = expressionFactory.ranges(remappedSearchBounds, collation, inputRowType);
}

IndexSpoolNode<Row> spoolNode = IndexSpoolNode.createTreeSpool(
Expand Down Expand Up @@ -547,10 +548,10 @@ private boolean hasExchange(RelNode rel) {

IgniteTable tbl = rel.getTable().unwrap(IgniteTable.class);

RelDataType rowType = rel.getDataSourceRowType();
RelDataType inputRowType = rel.getDataSourceRowType();

Predicate<Row> filters = condition == null ? null : expressionFactory.predicate(condition, rowType);
Function<Row, Row> prj = projects == null ? null : expressionFactory.project(projects, rowType);
Predicate<Row> filters = condition == null ? null : expressionFactory.predicate(condition, inputRowType);
Function<Row, Row> prj = projects == null ? null : expressionFactory.project(projects, inputRowType);

ColocationGroup grp = ctx.group(rel.sourceId());

Expand All @@ -559,12 +560,14 @@ private boolean hasExchange(RelNode rel) {
if (idx != null && !tbl.isIndexRebuildInProgress()) {
Iterable<Row> rowsIter = idx.scan(ctx, grp, null, requiredColumns);

return new ScanStorageNode<>(tbl.name() + '.' + idx.name(), ctx, rowType, rowsIter, filters, prj);
return createStorageScan(tbl.name() + '.' + idx.name(), rel.getRowType(), inputRowType,
rowsIter, filters, prj, requiredColumns, rel.conditionColumns());
}
else {
Iterable<Row> rowsIter = tbl.scan(ctx, grp, requiredColumns);

return new ScanStorageNode<>(tbl.name(), ctx, rowType, rowsIter, filters, prj);
return createStorageScan(tbl.name(), rel.getRowType(), inputRowType, rowsIter, filters, prj,
requiredColumns, rel.conditionColumns());
}
}

Expand Down Expand Up @@ -942,4 +945,46 @@ private Node<Row> visit(RelNode rel) {
public <T extends Node<Row>> T go(IgniteRel rel) {
return (T)visit(rel);
}

/** */
private ScanStorageNode<Row> createStorageScan(
String storageName,
RelDataType outputRowType,
RelDataType inputRowType,
Iterable<Row> rowsIter,
@Nullable Predicate<Row> filter,
@Nullable Function<Row, Row> rowTransformer,
@Nullable ImmutableBitSet requiredColumns,
@Nullable ImmutableBitSet filterColumns
) {
int fieldsCnt = outputRowType.getFieldCount();

if (filter == null || filterColumns == null || filterColumns.cardinality() == fieldsCnt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if I do something like "select i1 from t where i2=?" Can it happen? filterColumns.cardinality() == fieldsCnt but output field and filter field are different. Do we need at least any assert here?

|| !(rowsIter instanceof TableRowIterable))
return new ScanStorageNode<>(storageName, ctx, outputRowType, rowsIter, filter, rowTransformer);

ImmutableBitSet reqCols = requiredColumns == null ? ImmutableBitSet.range(0, fieldsCnt) : requiredColumns;

int[] filterColMapping = reqCols.toArray();
int[] otherColMapping = filterColMapping.clone();

for (int i = 0; i < filterColMapping.length; i++) {
if (filterColumns.get(i))
otherColMapping[i] = -1;
else
filterColMapping[i] = -1;
}

return new ScanTableRowNode<>(
storageName,
ctx,
outputRowType,
inputRowType,
(TableRowIterable<Object, Row>)rowsIter,
filter,
rowTransformer,
filterColMapping,
otherColMapping
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public class SystemViewScan<Row, ViewRow> implements Iterable<Row> {
/** */
private final RangeIterable<Row> ranges;

/** Participating colunms. */
private final ImmutableBitSet requiredColumns;
/** Row field to view column mapping. */
protected final int[] fieldColMapping;

/** System view field names (for filtering). */
private final String[] filterableFieldNames;
Expand All @@ -66,7 +66,6 @@ public SystemViewScan(
this.ectx = ectx;
this.desc = desc;
this.ranges = ranges;
this.requiredColumns = requiredColumns;

RelDataType rowType = desc.rowType(ectx.getTypeFactory(), requiredColumns);

Expand All @@ -83,6 +82,11 @@ public SystemViewScan(
}
}
}

ImmutableBitSet reqCols = requiredColumns == null ? ImmutableBitSet.range(0, rowType.getFieldCount())
: requiredColumns;

fieldColMapping = reqCols.toArray();
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -123,6 +127,6 @@ public SystemViewScan(
else
viewIter = view.iterator();

return F.iterator(viewIter, row -> desc.toRow(ectx, row, factory, requiredColumns), true);
return F.iterator(viewIter, row -> desc.toRow(ectx, row, factory.create(), fieldColMapping), true);
}
}
Loading
Loading