diff --git a/docs/_docs/SQL/sql-calcite.adoc b/docs/_docs/SQL/sql-calcite.adoc index 2e3ddc766e8b2..19311e0a603f3 100644 --- a/docs/_docs/SQL/sql-calcite.adoc +++ b/docs/_docs/SQL/sql-calcite.adoc @@ -371,10 +371,10 @@ SELECT /*+ ENFORCE_JOIN_ORDER */ T1.V1, T2.V1, T2.V2, T3.V1, T3.V2, T3.V3 FROM T SELECT t1.v1, t3.v2 FROM TBL1 t1 JOIN TBL3 t3 on t1.v3=t3.v3 WHERE t1.v2 in (SELECT /*+ ENFORCE_JOIN_ORDER */ t2.v2 FROM TBL2 t2 JOIN TBL3 t3 ON t2.v1=t3.v1) ---- -==== MERGE_JOIN, NL_JOIN, CNL_JOIN -Forces certain join type: Merge, Nested Loop and Correlated Nested Loop respectively. +==== MERGE_JOIN, NL_JOIN, CNL_JOIN, HASH_JOIN +Forces certain join type: Merge, Nested Loop, Correlated Nested Loop and Hash Join respectively. -Every of those has the negation like 'NO_INDEX': CNL_JOIN, NO_CNL_JOIN. The negation hint disables certain join type. +Every of those has the negation like 'NO_INDEX': CNL_JOIN, NO_CNL_JOIN, NO_HASH_JOIN. The negation hint disables certain join type. ===== Parameters: * Empty. To force or disable certain join type for every join. @@ -389,7 +389,7 @@ SELECT /*+ NL_JOIN(TBL3,TBL1) */ t4.v1, t2.v2 FROM TBL1 t4 JOIN TBL2 t2 on t1.v3 SELECT t1.v1, t2.v2 FROM TBL2 t1 JOIN TBL1 t2 on t1.v3=t2.v3 WHERE t2.v3 in (SELECT /*+ NO_CNL_JOIN(TBL4) */ t3.v3 FROM TBL3 t3 JOIN TBL4 t4 on t3.v1=t4.v1) -SELECT t4.v1, t2.v2 FROM TBL1 t4 JOIN TBL2 t2 on t1.v3=t2.v3 WHERE t2.v1 in (SELECT t3.v3 FROM TBL3 t3 JOIN TBL1 /*+ NL_JOIN */ t4 on t3.v2=t4.v2) +SELECT t4.v1, t2.v2 FROM TBL1 t4 JOIN TBL2 t2 on t1.v3=t2.v3 WHERE t2.v1 in (SELECT t3.v3 FROM TBL3 t3 JOIN TBL1 /*+ HASH_JOIN */ t4 on t3.v2=t4.v2) ---- ==== EXPAND_DISTINCT_AGG diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java index e1687bcbdc625..466fe822707b3 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.query.calcite.exec; import java.util.ArrayList; +import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Objects; @@ -37,6 +38,7 @@ import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexUtil; import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.mapping.IntPair; import org.apache.ignite.internal.processors.failure.FailureProcessor; @@ -51,6 +53,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.rel.CorrelatedNestedLoopJoinNode; import org.apache.ignite.internal.processors.query.calcite.exec.rel.FilterNode; import org.apache.ignite.internal.processors.query.calcite.exec.rel.HashAggregateNode; +import org.apache.ignite.internal.processors.query.calcite.exec.rel.HashJoinNode; import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox; import org.apache.ignite.internal.processors.query.calcite.exec.rel.IndexSpoolNode; import org.apache.ignite.internal.processors.query.calcite.exec.rel.IntersectNode; @@ -77,9 +80,11 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashIndexSpool; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashJoin; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexBound; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexCount; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoinInfo; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMergeJoin; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteNestedLoopJoin; @@ -268,6 +273,34 @@ public LogicalRelImplementor( return node; } + /** {@inheritDoc} */ + @Override public Node visit(IgniteHashJoin rel) { + RelDataType outType = rel.getRowType(); + RelDataType leftType = rel.getLeft().getRowType(); + RelDataType rightType = rel.getRight().getRowType(); + JoinRelType joinType = rel.getJoinType(); + + IgniteJoinInfo joinInfo = IgniteJoinInfo.of(rel); + + RexNode nonEquiConditionExpression = RexUtil.composeConjunction(Commons.emptyCluster().getRexBuilder(), + rel.analyzeCondition().nonEquiConditions, true); + + BiPredicate nonEquiCondition = null; + + if (nonEquiConditionExpression != null) { + RelDataType rowType = combinedRowType(ctx.getTypeFactory(), leftType, rightType); + + nonEquiCondition = expressionFactory.biPredicate(rel.getCondition(), rowType); + } + + Node node = HashJoinNode.create(ctx, outType, leftType, rightType, joinType, joinInfo, + nonEquiCondition); + + node.register(Arrays.asList(visit(rel.getLeft()), visit(rel.getRight()))); + + return node; + } + /** {@inheritDoc} */ @Override public Node visit(IgniteCorrelatedNestedLoopJoin rel) { RelDataType outType = rel.getRowType(); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MappingRowHandler.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MappingRowHandler.java index 6d5a9b3f61b25..06947d6d5cfa0 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MappingRowHandler.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MappingRowHandler.java @@ -36,6 +36,12 @@ public MappingRowHandler(RowHandler delegate, ImmutableBitSet requiredColum mapping = requiredColumns.toArray(); } + /** */ + public MappingRowHandler(RowHandler delegate, int[] requiredColumns) { + this.delegate = delegate; + mapping = requiredColumns; + } + /** {@inheritDoc} */ @Override public Object get(int field, Row row) { return delegate.get(mapping[field], row); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeHashIndex.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeHashIndex.java index 0e670e1da5f31..a37311afd6cc4 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeHashIndex.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeHashIndex.java @@ -17,15 +17,17 @@ package org.apache.ignite.internal.processors.query.calcite.exec; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; -import java.util.List; +import java.util.Map; import java.util.function.Supplier; import org.apache.calcite.util.ImmutableBitSet; +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.binary.BinaryReader; +import org.apache.ignite.binary.BinaryWriter; import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey; -import org.apache.ignite.internal.util.typedef.F; -import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; /** @@ -39,26 +41,51 @@ public class RuntimeHashIndex implements RuntimeIndex { private final RowHandler keysRowHnd; /** Rows. */ - private final HashMap, List> rows; + private final Map, Collection> rows; - /** Allow NULL values. */ - private final boolean allowNulls; + /** */ + private final Supplier> collectionFactory; - /** - * - */ + /** */ + private final ImmutableBitSet nullsMatch; + + /** Creates hash index with the default collection supplier. */ + public RuntimeHashIndex(ExecutionContext ectx, ImmutableBitSet keys, ImmutableBitSet nullsMatch) { + this(ectx, keys.toArray(), nullsMatch, -1, null); + } + + /** */ public RuntimeHashIndex( ExecutionContext ectx, - ImmutableBitSet keys, - boolean allowNulls + int[] keys, + ImmutableBitSet nullsMatch, + int initCapacity, + @Nullable Supplier> collectionFactory + ) { + this( + ectx, + nullsMatch, + new MappingRowHandler<>(ectx.rowHandler(), keys), + initCapacity >= 0 ? new HashMap<>(initCapacity) : new HashMap<>(), + collectionFactory + ); + } + + /** Fields setting constructor. */ + private RuntimeHashIndex( + ExecutionContext ectx, + ImmutableBitSet nullsMatch, + RowHandler keysRowHnd, + Map, Collection> rows, + @Nullable Supplier> collectionFactory ) { this.ectx = ectx; - this.allowNulls = allowNulls; + this.nullsMatch = nullsMatch; - assert !F.isEmpty(keys); + this.keysRowHnd = keysRowHnd; + this.rows = rows; - keysRowHnd = new MappingRowHandler<>(ectx.rowHandler(), keys); - rows = new HashMap<>(); + this.collectionFactory = collectionFactory == null ? ArrayList::new : collectionFactory; } /** {@inheritDoc} */ @@ -68,7 +95,7 @@ public RuntimeHashIndex( if (key == null) return; - List eqRows = rows.computeIfAbsent(key, k -> new ArrayList<>()); + Collection eqRows = rows.computeIfAbsent(key, k -> collectionFactory.get()); eqRows.add(r); } @@ -79,7 +106,12 @@ public RuntimeHashIndex( } /** */ - public Iterable scan(Supplier searchRow) { + public Collection> rowSets() { + return Collections.unmodifiableCollection(rows.values()); + } + + /** */ + public IndexScan scan(Supplier searchRow) { return new IndexScan(searchRow); } @@ -89,40 +121,92 @@ public Iterable scan(Supplier searchRow) { * IS NOT DISTINCT FROM condition). */ private @Nullable GroupKey key(Row r) { - if (!allowNulls) { - for (int i = 0; i < keysRowHnd.columnCount(r); i++) { - if (keysRowHnd.get(i, r) == null) - return null; + return new NullsCheckingGroupKey<>(r, keysRowHnd); + } + + /** */ + public RuntimeHashIndex remappedSearcher(int[] remappedKeys) { + return new RemappedSearcher<>(this, remappedKeys); + } + + /** */ + private class NullsCheckingGroupKey extends GroupKey { + /** */ + private NullsCheckingGroupKey(Row row, RowHandler hnd) { + super(row, hnd); + } + + /** {@inheritDoc} */ + @Override protected boolean columnValuesEquals(int colIdx, Object v1, Object v2) { + if (v1 == null && v2 == null) { + if (nullsMatch.cardinality() == 0) + return false; + + return nullsMatch.get(colIdx); } + + return super.columnValuesEquals(colIdx, v1, v2); + } + + /** {@inheritDoc} */ + @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException { + throw new UnsupportedOperationException("Serialization of row keys is not supported."); + } + + /** {@inheritDoc} */ + @Override public void readBinary(BinaryReader reader) throws BinaryObjectException { + throw new UnsupportedOperationException("Deserialization of row keys is not supported."); + } + } + + /** */ + private static class RemappedSearcher extends RuntimeHashIndex { + /** */ + private final RuntimeHashIndex origin; + + /** */ + private RemappedSearcher(RuntimeHashIndex o, int[] remappedKeys) { + super(o.ectx, o.nullsMatch, new MappingRowHandler<>(o.ectx.rowHandler(), remappedKeys), o.rows, + o.collectionFactory); + + origin = o; } - return new GroupKey<>(r, keysRowHnd); + /** {@inheritDoc} */ + @Override public void push(Row r) { + origin.push(r); + } } /** * */ - private class IndexScan implements Iterable { + public class IndexScan implements Iterable { /** Search row. */ private final Supplier searchRow; /** * @param searchRow Search row. */ - IndexScan(Supplier searchRow) { + private IndexScan(Supplier searchRow) { this.searchRow = searchRow; } - /** {@inheritDoc} */ - @NotNull @Override public Iterator iterator() { + /** */ + public @Nullable Collection get() { GroupKey key = key(searchRow.get()); if (key == null) - return Collections.emptyIterator(); + return null; - List eqRows = rows.get(key); + return rows.get(key); + } + + /** {@inheritDoc} */ + @Override public Iterator iterator() { + Collection collection = get(); - return eqRows == null ? Collections.emptyIterator() : eqRows.iterator(); + return collection == null ? Collections.emptyIterator() : collection.iterator(); } } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/GroupKey.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/GroupKey.java index 42fb13507b97a..c771857b35c96 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/GroupKey.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/GroupKey.java @@ -95,13 +95,21 @@ public RowHandler rowHandler() { return false; for (int i = 0; i < colCnt; i++) { - if (!Objects.equals(hnd.get(i, row), other.hnd.get(i, other.row))) + Object o1 = hnd.get(i, row); + Object o2 = other.hnd.get(i, other.row); + + if (!columnValuesEquals(i, o1, o2) || !Objects.equals(o1, o2)) return false; } return true; } + /** */ + protected boolean columnValuesEquals(int colIdx, Object val1, Object val2) { + return true; + } + /** {@inheritDoc} */ @Override public int hashCode() { int hashCode = 0; diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractRightMaterializedJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractRightMaterializedJoinNode.java new file mode 100644 index 0000000000000..7dce770bcaeed --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractRightMaterializedJoinNode.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.rel; + +import java.util.ArrayDeque; +import java.util.Deque; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; +import org.apache.ignite.internal.util.typedef.F; +import org.jetbrains.annotations.Nullable; + +/** Right-part materialized join node. Holds data from the right part locally. */ +public abstract class AbstractRightMaterializedJoinNode extends MemoryTrackingNode { + /** */ + protected static final int HALF_BUF_SIZE = IN_BUFFER_SIZE >> 1; + + /** Special flag which marks that all the rows are received. */ + protected static final int NOT_WAITING = -1; + + /** */ + protected final Deque leftInBuf = new ArrayDeque<>(IN_BUFFER_SIZE); + + /** */ + protected boolean inLoop; + + /** */ + protected int requested; + + /** */ + protected int waitingLeft; + + /** */ + protected int waitingRight; + + /** */ + protected @Nullable Row left; + + /** */ + protected int processed; + + /** */ + protected AbstractRightMaterializedJoinNode(ExecutionContext ctx, RelDataType rowType) { + super(ctx, rowType); + } + + /** */ + protected abstract void join() throws Exception; + + /** */ + protected abstract void pushRight(Row row) throws Exception; + + /** {@inheritDoc} */ + @Override public void request(int rowsCnt) throws Exception { + assert !F.isEmpty(sources()) && sources().size() == 2; + assert rowsCnt > 0 && requested == 0; + + requested = rowsCnt; + + if (!inLoop) + context().execute(this::join0, this::onError); + } + + /** {@inheritDoc} */ + @Override protected void rewindInternal() { + requested = 0; + waitingLeft = 0; + waitingRight = 0; + left = null; + + leftInBuf.clear(); + } + + /** {@inheritDoc} */ + @Override protected Downstream requestDownstream(int idx) { + if (idx == 0) { + return new Downstream<>() { + @Override public void push(Row row) throws Exception { + pushLeft(row); + } + + @Override public void end() throws Exception { + endLeft(); + } + + @Override public void onError(Throwable e) { + AbstractRightMaterializedJoinNode.this.onError(e); + } + }; + } + else if (idx == 1) { + return new Downstream<>() { + @Override public void push(Row row) throws Exception { + pushRight(row); + } + + @Override public void end() throws Exception { + endRight(); + } + + @Override public void onError(Throwable e) { + AbstractRightMaterializedJoinNode.this.onError(e); + } + }; + } + + throw new IndexOutOfBoundsException(); + } + + /** */ + private void pushLeft(Row row) throws Exception { + assert downstream() != null; + assert waitingLeft > 0; + + --waitingLeft; + + leftInBuf.add(row); + + join0(); + } + + /** */ + private void endLeft() throws Exception { + assert downstream() != null; + assert waitingLeft > 0; + + waitingLeft = NOT_WAITING; + + join0(); + } + + /** */ + private void endRight() throws Exception { + assert downstream() != null; + assert waitingRight > 0; + + waitingRight = NOT_WAITING; + + join0(); + } + + /** */ + protected void tryToRequestInputs() throws Exception { + if (waitingLeft == 0 && leftInBuf.size() <= HALF_BUF_SIZE) + leftSource().request(waitingLeft = IN_BUFFER_SIZE - leftInBuf.size()); + + if (waitingRight == 0 && requested > 0) + rightSource().request(waitingRight = IN_BUFFER_SIZE); + } + + /** */ + protected Node leftSource() { + return sources().get(0); + } + + /** */ + protected Node rightSource() { + return sources().get(1); + } + + /** */ + private void join0() throws Exception { + checkState(); + + processed = 0; + + join(); + } + + /** */ + protected boolean rescheduleJoin() { + if (processed++ > IN_BUFFER_SIZE) { + context().execute(this::join0, this::onError); + + return true; + } + + return false; + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinNode.java new file mode 100644 index 0000000000000..63c0a0f837bbe --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinNode.java @@ -0,0 +1,772 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.rel; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.function.BiPredicate; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; +import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; +import org.apache.ignite.internal.processors.query.calcite.exec.RuntimeHashIndex; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoinInfo; +import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory; +import org.apache.ignite.internal.util.typedef.F; +import org.jetbrains.annotations.Nullable; + +/** Hash join implementor. */ +public abstract class HashJoinNode extends AbstractRightMaterializedJoinNode { + /** */ + private static final int INITIAL_CAPACITY = 128; + + /** */ + private final int[] leftKeys; + + /** */ + private final int[] rightKeys; + + /** Output row handler. */ + protected final RowHandler outRowHnd; + + /** Right rows storage. */ + protected final RuntimeHashIndex rightHashStore; + + /** Uses keys of right hand to find matching left rows. */ + protected final RuntimeHashIndex remappedLeftSearcher; + + /** */ + protected Iterator rightIt = Collections.emptyIterator(); + + /** */ + @Nullable protected final BiPredicate nonEqCond; + + /** + * Creates hash join node. + * + * @param ctx Execution context. + * @param rowType Out row type. + * @param info Join info. + * @param outRowHnd Output row handler. + * @param keepRowsWithNull {@code True} if we need to store the row from right shoulder even if it contains NULL in + * any of join key position. This is required for joins which emit unmatched part + * of the right shoulder, such as RIGHT JOIN and FULL OUTER JOIN. + * @param nonEqCond If provided, only rows matching the predicate will be emitted as matched rows. + */ + protected HashJoinNode( + ExecutionContext ctx, + RelDataType rowType, + IgniteJoinInfo info, + RowHandler outRowHnd, + boolean keepRowsWithNull, + @Nullable BiPredicate nonEqCond + ) { + super(ctx, rowType); + + // For IS NOT DISTINCT we have to keep rows with null values. + if (!keepRowsWithNull && info.allowNulls().cardinality() > 0) + keepRowsWithNull = true; + + leftKeys = info.leftKeys.toIntArray(); + rightKeys = info.rightKeys.toIntArray(); + + assert leftKeys.length == rightKeys.length; + + this.outRowHnd = outRowHnd; + + this.nonEqCond = nonEqCond; + + rightHashStore = new RuntimeHashIndex<>(ctx, rightKeys, keepRowsWithNull ? info.allowNulls() : ImmutableBitSet.of(), + INITIAL_CAPACITY, TouchedArrayList::new); + + remappedLeftSearcher = rightHashStore.remappedSearcher(leftKeys); + } + + /** {@inheritDoc} */ + @Override protected void rewindInternal() { + super.rewindInternal(); + + rightIt = Collections.emptyIterator(); + + rightHashStore.close(); + } + + /** Creates certain join node. */ + public static HashJoinNode create( + ExecutionContext ctx, + RelDataType rowType, + RelDataType leftRowType, + RelDataType rightRowType, + JoinRelType type, + IgniteJoinInfo info, + @Nullable BiPredicate nonEqCond + ) { + assert !info.pairs().isEmpty() && (info.isEqui() || type == JoinRelType.INNER || type == JoinRelType.SEMI); + assert nonEqCond == null || type == JoinRelType.INNER || type == JoinRelType.SEMI; + + IgniteTypeFactory typeFactory = ctx.getTypeFactory(); + RowHandler rowHnd = ctx.rowHandler(); + + switch (type) { + case INNER: + return new InnerHashJoin<>(ctx, rowType, info, rowHnd, nonEqCond); + + case LEFT: + return new LeftHashJoin<>(ctx, rowType, info, rowHnd, rowHnd.factory(typeFactory, rightRowType)); + + case RIGHT: + return new RightHashJoin<>(ctx, rowType, info, rowHnd, rowHnd.factory(typeFactory, leftRowType)); + + case FULL: { + return new FullOuterHashJoin<>(ctx, rowType, info, rowHnd, rowHnd.factory(typeFactory, leftRowType), + rowHnd.factory(typeFactory, rightRowType), nonEqCond); + } + + case SEMI: + return new SemiHashJoin<>(ctx, rowType, info, rowHnd, nonEqCond); + + case ANTI: + return new AntiHashJoin<>(ctx, rowType, info, rowHnd, nonEqCond); + + default: + throw new IllegalArgumentException("Join of type '" + type + "' isn't supported."); + } + } + + /** */ + protected Collection lookup(Row row) { + Collection res = remappedLeftSearcher.scan(() -> row).get(); + + if (res == null) + return Collections.emptyList(); + + assert res instanceof TouchedArrayList; + + ((TouchedArrayList)res).touched = true; + + return res; + } + + /** */ + protected Iterator untouched() { + return F.flat(F.iterator(rightHashStore.rowSets(), c0 -> c0, true, c1 -> !((TouchedArrayList)c1).touched)); + } + + /** {@inheritDoc} */ + @Override protected void pushRight(Row row) throws Exception { + assert downstream() != null; + assert waitingRight > 0; + + checkState(); + + nodeMemoryTracker.onRowAdded(row); + + waitingRight--; + + rightHashStore.push(row); + + if (waitingRight == 0) + rightSource().request(waitingRight = IN_BUFFER_SIZE); + } + + /** */ + protected boolean leftFinished() { + return waitingLeft == NOT_WAITING && left == null && leftInBuf.isEmpty(); + } + + /** */ + protected boolean rightFinished() { + return waitingRight == NOT_WAITING && !rightIt.hasNext(); + } + + /** */ + protected boolean checkJoinFinished() throws Exception { + if (requested > 0 && leftFinished() && rightFinished()) { + requested = 0; + + rightHashStore.close(); + + downstream().end(); + + return true; + } + + return false; + } + + /** */ + private static final class InnerHashJoin extends HashJoinNode { + /** + * Creates node for INNER JOIN. + * + * @param ctx Execution context. + * @param rowType Out row type. + * @param info Join info. + * @param outRowHnd Output row handler. + * @param nonEqCond If provided, only rows matching the predicate will be emitted as matched rows. + */ + private InnerHashJoin(ExecutionContext ctx, + RelDataType rowType, + IgniteJoinInfo info, + RowHandler outRowHnd, + @Nullable BiPredicate nonEqCond + ) { + super(ctx, rowType, info, outRowHnd, false, nonEqCond); + } + + /** {@inheritDoc} */ + @Override protected void join() throws Exception { + if (waitingRight == NOT_WAITING) { + inLoop = true; + + try { + while (requested > 0 && (left != null || !leftInBuf.isEmpty())) { + // Proceed with next left row, if previous was fully processed. + if (left == null) { + left = leftInBuf.remove(); + + rightIt = lookup(left).iterator(); + } + + if (rightIt.hasNext()) { + // Emits matched rows. + while (requested > 0 && rightIt.hasNext()) { + checkState(); + + if (rescheduleJoin()) + return; + + RowT right = rightIt.next(); + + if (nonEqCond != null && !nonEqCond.test(left, right)) + continue; + + --requested; + + downstream().push(outRowHnd.concat(left, right)); + } + + if (!rightIt.hasNext()) + left = null; + } + else + left = null; + } + } + finally { + inLoop = false; + } + } + + if (checkJoinFinished()) + return; + + tryToRequestInputs(); + } + } + + /** */ + private static final class LeftHashJoin extends HashJoinNode { + /** Right row factory. */ + private final RowHandler.RowFactory rightRowFactory; + + /** + * Creates node for LEFT OUTER JOIN. + * + * @param ctx Execution context. + * @param info Join info. + * @param rowType Out row type. + * @param outRowHnd Output row handler. + * @param rightRowFactory Right row factory. + */ + private LeftHashJoin( + ExecutionContext ctx, + RelDataType rowType, + IgniteJoinInfo info, + RowHandler outRowHnd, + RowHandler.RowFactory rightRowFactory + ) { + super(ctx, rowType, info, outRowHnd, false, null); + + assert nonEqCond == null : "Non equi condition is not supported in LEFT join"; + + this.rightRowFactory = rightRowFactory; + } + + /** {@inheritDoc} */ + @Override protected void join() throws Exception { + if (waitingRight == NOT_WAITING) { + inLoop = true; + + try { + while (requested > 0 && (left != null || !leftInBuf.isEmpty())) { + checkState(); + + // Proceed with next left row, if previous was fully processed. + if (!rightIt.hasNext()) { + left = leftInBuf.remove(); + + Collection rightRows = lookup(left); + + if (rightRows.isEmpty()) { + requested--; + + downstream().push(outRowHnd.concat(left, rightRowFactory.create())); + } + + rightIt = rightRows.iterator(); + } + + if (rightIt.hasNext()) { + // Emit unmatched left row. + while (requested > 0 && rightIt.hasNext()) { + checkState(); + + if (rescheduleJoin()) + return; + + RowT right = rightIt.next(); + + --requested; + + downstream().push(outRowHnd.concat(left, right)); + } + + if (!rightIt.hasNext()) + left = null; + } + else + left = null; + } + } + finally { + inLoop = false; + } + } + + if (checkJoinFinished()) + return; + + tryToRequestInputs(); + } + } + + /** */ + private static final class RightHashJoin extends HashJoinNode { + /** Left row factory. */ + private final RowHandler.RowFactory leftRowFactory; + + /** */ + private boolean drainMaterialization; + + /** + * Creates node for RIGHT OUTER JOIN. + * + * @param ctx Execution context. + * @param rowType Out row type. + * @param info Join info. + * @param outRowHnd Output row handler. + * @param leftRowFactory Left row factory. + */ + private RightHashJoin( + ExecutionContext ctx, + RelDataType rowType, + IgniteJoinInfo info, + RowHandler outRowHnd, + RowHandler.RowFactory leftRowFactory + ) { + super(ctx, rowType, info, outRowHnd, true, null); + + assert nonEqCond == null : "Non equi condition is not supported in RIGHT join"; + + this.leftRowFactory = leftRowFactory; + } + + /** {@inheritDoc} */ + @Override protected void join() throws Exception { + if (waitingRight == NOT_WAITING) { + inLoop = true; + + try { + while (requested > 0 && (left != null || !leftInBuf.isEmpty())) { + checkState(); + + // Proceed with next left row, if previous was fully processed. + if (!rightIt.hasNext()) { + left = leftInBuf.remove(); + + rightIt = lookup(left).iterator(); + } + + if (rightIt.hasNext()) { + // Emits matched rows. + while (requested > 0 && rightIt.hasNext()) { + checkState(); + + if (rescheduleJoin()) + return; + + RowT right = rightIt.next(); + + --requested; + + downstream().push(outRowHnd.concat(left, right)); + } + + if (!rightIt.hasNext()) + left = null; + } + else + left = null; + } + } + finally { + inLoop = false; + } + } + + // Emit unmatched right rows. + if (left == null && leftInBuf.isEmpty() && waitingLeft == NOT_WAITING && waitingRight == NOT_WAITING && requested > 0) { + inLoop = true; + + try { + if (!rightIt.hasNext() && !drainMaterialization) { + // Prevent scanning store more than once. + drainMaterialization = true; + + rightIt = untouched(); + } + + RowT emptyLeft = leftRowFactory.create(); + + while (requested > 0 && rightIt.hasNext()) { + checkState(); + + if (rescheduleJoin()) + return; + + RowT right = rightIt.next(); + + RowT row = outRowHnd.concat(emptyLeft, right); + + --requested; + + downstream().push(row); + } + } + finally { + inLoop = false; + } + } + + if (checkJoinFinished()) + return; + + tryToRequestInputs(); + } + + /** {@inheritDoc} */ + @Override protected void rewindInternal() { + drainMaterialization = false; + + super.rewindInternal(); + } + } + + /** */ + private static final class FullOuterHashJoin extends HashJoinNode { + /** Left row factory. */ + private final RowHandler.RowFactory leftRowFactory; + + /** Right row factory. */ + private final RowHandler.RowFactory rightRowFactory; + + /** */ + private boolean drainMaterialization; + + /** + * Creates node for FULL OUTER JOIN. + * + * @param ctx Execution context. + * @param rowType Row type. + * @param info Join info. + * @param outRowHnd Out row handler. + * @param leftRowFactory Left row factory. + * @param rightRowFactory Right row factory. + */ + private FullOuterHashJoin( + ExecutionContext ctx, + RelDataType rowType, + IgniteJoinInfo info, + RowHandler outRowHnd, + RowHandler.RowFactory leftRowFactory, + RowHandler.RowFactory rightRowFactory, + @Nullable BiPredicate nonEqCond + ) { + super(ctx, rowType, info, outRowHnd, true, null); + + assert nonEqCond == null : "Non equi condition is not supported in FULL OUTER join"; + + this.leftRowFactory = leftRowFactory; + this.rightRowFactory = rightRowFactory; + } + + /** {@inheritDoc} */ + @Override protected void join() throws Exception { + if (waitingRight == NOT_WAITING) { + inLoop = true; + + try { + while (requested > 0 && (left != null || !leftInBuf.isEmpty())) { + checkState(); + + // Proceed with next left row, if previous was fully processed. + if (!rightIt.hasNext()) { + left = leftInBuf.remove(); + + Collection rightRows = lookup(left); + + if (rightRows.isEmpty()) { + // Emit empty right row for unmatched left row. + rightIt = Collections.singletonList(rightRowFactory.create()).iterator(); + } + else + rightIt = rightRows.iterator(); + } + + if (rightIt.hasNext()) { + // Emits matched rows. + while (requested > 0 && rightIt.hasNext()) { + if (rescheduleJoin()) + return; + + checkState(); + + RowT right = rightIt.next(); + + --requested; + + downstream().push(outRowHnd.concat(left, right)); + } + + if (!rightIt.hasNext()) + left = null; + } + else + left = null; + } + } + finally { + inLoop = false; + } + } + + // Emit unmatched right rows. + if (left == null && leftInBuf.isEmpty() && waitingLeft == NOT_WAITING && waitingRight == NOT_WAITING && requested > 0) { + inLoop = true; + + try { + if (!rightIt.hasNext() && !drainMaterialization) { + // Prevent scanning store more than once. + drainMaterialization = true; + + rightIt = untouched(); + } + + RowT emptyLeft = leftRowFactory.create(); + + while (requested > 0 && rightIt.hasNext()) { + if (rescheduleJoin()) + return; + + checkState(); + + RowT right = rightIt.next(); + + RowT row = outRowHnd.concat(emptyLeft, right); + + --requested; + + downstream().push(row); + } + } + finally { + inLoop = false; + } + } + + if (checkJoinFinished()) + return; + + tryToRequestInputs(); + } + + /** {@inheritDoc} */ + @Override protected void rewindInternal() { + drainMaterialization = false; + + super.rewindInternal(); + } + } + + /** */ + private static final class SemiHashJoin extends HashJoinNode { + /** + * Creates node for SEMI JOIN operator. + * + * @param ctx Execution context. + * @param rowType Out row type. + * @param info Join info. + * @param outRowHnd Output row handler. + * @param nonEqCond If provided, only rows matching the predicate will be emitted as matched rows. + */ + private SemiHashJoin( + ExecutionContext ctx, + RelDataType rowType, + IgniteJoinInfo info, + RowHandler outRowHnd, + @Nullable BiPredicate nonEqCond + ) { + super(ctx, rowType, info, outRowHnd, false, nonEqCond); + } + + /** {@inheritDoc} */ + @Override protected void join() throws Exception { + if (waitingRight == NOT_WAITING) { + inLoop = true; + + try { + while (requested > 0 && (left != null || !leftInBuf.isEmpty())) { + checkState(); + + // Proceed with next left row, if previous was fully processed. + if (!rightIt.hasNext()) { + left = leftInBuf.remove(); + + rightIt = lookup(left).iterator(); + } + + boolean anyMatched = rightIt.hasNext() && nonEqCond == null; + + if (!anyMatched) { + // Find any matched row. + while (rightIt.hasNext()) { + RowT right = rightIt.next(); + + if (nonEqCond.test(left, right)) { + anyMatched = true; + + break; + } + + if (rescheduleJoin()) + return; + } + } + + if (anyMatched) { + requested--; + + downstream().push(left); + + rightIt = Collections.emptyIterator(); + } + + if (!rightIt.hasNext()) + left = null; + } + } + finally { + inLoop = false; + } + } + + if (checkJoinFinished()) + return; + + tryToRequestInputs(); + } + } + + /** */ + private static final class AntiHashJoin extends HashJoinNode { + /** + * Creates node for ANTI JOIN. + * + * @param ctx Execution context. + * @param rowType Out row type. + * @param info Join info. + * @param outRowHnd Output row handler. + * @param nonEqCond Non-equi conditions. + */ + private AntiHashJoin( + ExecutionContext ctx, + RelDataType rowType, + IgniteJoinInfo info, + RowHandler outRowHnd, + @Nullable BiPredicate nonEqCond + ) { + super(ctx, rowType, info, outRowHnd, false, nonEqCond); + } + + /** {@inheritDoc} */ + @Override protected void join() throws Exception { + if (waitingRight == NOT_WAITING) { + inLoop = true; + + try { + while (requested > 0 && (left != null || !leftInBuf.isEmpty())) { + checkState(); + + if (rescheduleJoin()) + return; + + left = leftInBuf.remove(); + + Collection rightRows = lookup(left); + + if (rightRows.isEmpty()) { + requested--; + + downstream().push(left); + } + + left = null; + } + } + finally { + inLoop = false; + } + } + + if (checkJoinFinished()) + return; + + tryToRequestInputs(); + } + } + + /** */ + private static final class TouchedArrayList extends ArrayList { + /** */ + private boolean touched; + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IndexSpoolNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IndexSpoolNode.java index 998792f0f36c1..73998d1b72d11 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IndexSpoolNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IndexSpoolNode.java @@ -191,7 +191,9 @@ public static IndexSpoolNode createHashSpool( Supplier searchRow, boolean allowNulls ) { - RuntimeHashIndex idx = new RuntimeHashIndex<>(ctx, keys, allowNulls); + ImmutableBitSet allowNulls0 = allowNulls ? ImmutableBitSet.range(keys.cardinality()) : ImmutableBitSet.of(); + + RuntimeHashIndex idx = new RuntimeHashIndex<>(ctx, keys, allowNulls0); ScanNode scan = new ScanNode<>( ctx, diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java index abca013eafe58..f2f42e3b68dab 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/NestedLoopJoinNode.java @@ -17,212 +17,76 @@ package org.apache.ignite.internal.processors.query.calcite.exec.rel; -import java.util.ArrayDeque; import java.util.ArrayList; import java.util.BitSet; -import java.util.Deque; import java.util.List; import java.util.function.BiPredicate; import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.type.RelDataType; import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; -import org.apache.ignite.internal.util.typedef.F; -import org.jetbrains.annotations.NotNull; /** */ -public abstract class NestedLoopJoinNode extends MemoryTrackingNode { - /** */ - private static final int HALF_BUF_SIZE = IN_BUFFER_SIZE >> 1; - - /** Special value to highlights that all row were received and we are not waiting any more. */ - protected static final int NOT_WAITING = -1; - +public abstract class NestedLoopJoinNode extends AbstractRightMaterializedJoinNode { /** */ protected final BiPredicate cond; /** */ - protected final RowHandler handler; - - /** */ - protected int requested; - - /** */ - protected Row left; - - /** */ - protected int rightIdx; - - /** */ - protected int waitingLeft; - - /** */ - protected int waitingRight; + protected final RowHandler rowHnd; /** */ protected final List rightMaterialized = new ArrayList<>(IN_BUFFER_SIZE); /** */ - protected final Deque leftInBuf = new ArrayDeque<>(IN_BUFFER_SIZE); - - /** */ - protected boolean inLoop; + protected int rightIdx; /** * @param ctx Execution context. + * @param rowType Row type. * @param cond Join expression. */ private NestedLoopJoinNode(ExecutionContext ctx, RelDataType rowType, BiPredicate cond) { super(ctx, rowType); this.cond = cond; - handler = ctx.rowHandler(); - } - - /** {@inheritDoc} */ - @Override public void request(int rowsCnt) throws Exception { - assert !F.isEmpty(sources()) && sources().size() == 2; - assert rowsCnt > 0 && requested == 0; - - checkState(); - - requested = rowsCnt; - - if (!inLoop) - context().execute(this::doJoin, this::onError); - } - - /** */ - private void doJoin() throws Exception { - checkState(); - - join(); + rowHnd = ctx.rowHandler(); } /** {@inheritDoc} */ @Override protected void rewindInternal() { - requested = 0; - waitingLeft = 0; - waitingRight = 0; + super.rewindInternal(); - rightMaterialized.clear(); - leftInBuf.clear(); - - left = null; rightIdx = 0; - } - - /** {@inheritDoc} */ - @Override protected Downstream requestDownstream(int idx) { - if (idx == 0) - return new Downstream() { - /** {@inheritDoc} */ - @Override public void push(Row row) throws Exception { - pushLeft(row); - } - - /** {@inheritDoc} */ - @Override public void end() throws Exception { - endLeft(); - } - - /** {@inheritDoc} */ - @Override public void onError(Throwable e) { - NestedLoopJoinNode.this.onError(e); - } - }; - else if (idx == 1) - return new Downstream() { - /** {@inheritDoc} */ - @Override public void push(Row row) throws Exception { - pushRight(row); - } - - /** {@inheritDoc} */ - @Override public void end() throws Exception { - endRight(); - } - - /** {@inheritDoc} */ - @Override public void onError(Throwable e) { - NestedLoopJoinNode.this.onError(e); - } - }; - - throw new IndexOutOfBoundsException(); - } - - /** */ - private void pushLeft(Row row) throws Exception { - assert downstream() != null; - assert waitingLeft > 0; - - checkState(); - - waitingLeft--; - leftInBuf.add(row); - - join(); + rightMaterialized.clear(); } - /** */ - private void pushRight(Row row) throws Exception { + /** {@inheritDoc} */ + @Override protected void pushRight(Row row) throws Exception { assert downstream() != null; assert waitingRight > 0; checkState(); + nodeMemoryTracker.onRowAdded(row); + waitingRight--; rightMaterialized.add(row); - nodeMemoryTracker.onRowAdded(row); - if (waitingRight == 0) rightSource().request(waitingRight = IN_BUFFER_SIZE); } /** */ - private void endLeft() throws Exception { - assert downstream() != null; - assert waitingLeft > 0; - - checkState(); - - waitingLeft = NOT_WAITING; - - join(); - } - - /** */ - private void endRight() throws Exception { - assert downstream() != null; - assert waitingRight > 0; - - checkState(); - - waitingRight = NOT_WAITING; - - join(); - } - - /** */ - protected Node leftSource() { - return sources().get(0); - } - - /** */ - protected Node rightSource() { - return sources().get(1); - } - - /** */ - protected abstract void join() throws Exception; - - /** */ - @NotNull public static NestedLoopJoinNode create(ExecutionContext ctx, RelDataType outputRowType, - RelDataType leftRowType, RelDataType rightRowType, JoinRelType joinType, BiPredicate cond) { + public static NestedLoopJoinNode create( + ExecutionContext ctx, + RelDataType outputRowType, + RelDataType leftRowType, + RelDataType rightRowType, + JoinRelType joinType, + BiPredicate cond + ) { switch (joinType) { case INNER: return new InnerJoin<>(ctx, outputRowType, cond); @@ -253,21 +117,18 @@ protected Node rightSource() { return new AntiJoin<>(ctx, outputRowType, cond); default: - throw new IllegalStateException("Join type \"" + joinType + "\" is not supported yet"); + throw new IllegalArgumentException("Join type '" + joinType + "' is not supported."); } } /** */ private static class InnerJoin extends NestedLoopJoinNode { - /** - * @param ctx Execution context. - * @param cond Join expression. - */ + /** */ public InnerJoin(ExecutionContext ctx, RelDataType rowType, BiPredicate cond) { super(ctx, rowType, cond); } - /** */ + /** {@inheritDoc} */ @Override protected void join() throws Exception { if (waitingRight == NOT_WAITING) { inLoop = true; @@ -283,7 +144,7 @@ public InnerJoin(ExecutionContext ctx, RelDataType rowType, BiPredicate extends NestedLoopJoinNode { /** Right row factory. */ private final RowHandler.RowFactory rightRowFactory; - /** Whether current left row was matched or not. */ + /** Shows whether current left row was matched. */ private boolean matched; - /** - * @param ctx Execution context. - * @param cond Join expression. - */ + /** */ public LeftJoin( ExecutionContext ctx, RelDataType rowType, @@ -328,11 +186,11 @@ public LeftJoin( this.rightRowFactory = rightRowFactory; } - /** */ + /** {@inheritDoc} */ @Override protected void rewindInternal() { - matched = false; - super.rewindInternal(); + + matched = false; } /** {@inheritDoc} */ @@ -356,7 +214,7 @@ public LeftJoin( requested--; matched = true; - Row row = handler.concat(left, rightMaterialized.get(rightIdx - 1)); + Row row = rowHnd.concat(left, rightMaterialized.get(rightIdx - 1)); downstream().push(row); } @@ -367,7 +225,7 @@ public LeftJoin( requested--; wasPushed = true; - downstream().push(handler.concat(left, rightRowFactory.create())); + downstream().push(rowHnd.concat(left, rightRowFactory.create())); } if (matched || wasPushed) { @@ -391,7 +249,7 @@ public LeftJoin( /** */ private static class RightJoin extends NestedLoopJoinNode { - /** Right row factory. */ + /** Left row factory. */ private final RowHandler.RowFactory leftRowFactory; /** */ @@ -400,10 +258,7 @@ private static class RightJoin extends NestedLoopJoinNode { /** */ private int lastPushedInd; - /** - * @param ctx Execution context. - * @param cond Join expression. - */ + /** */ public RightJoin( ExecutionContext ctx, RelDataType rowType, @@ -417,10 +272,10 @@ public RightJoin( /** {@inheritDoc} */ @Override protected void rewindInternal() { + super.rewindInternal(); + rightNotMatchedIndexes.clear(); lastPushedInd = 0; - - super.rewindInternal(); } /** {@inheritDoc} */ @@ -449,7 +304,7 @@ public RightJoin( requested--; rightNotMatchedIndexes.clear(rightIdx - 1); - Row joined = handler.concat(left, right); + Row joined = rowHnd.concat(left, right); downstream().push(joined); } @@ -477,7 +332,7 @@ public RightJoin( if (lastPushedInd < 0) break; - Row row = handler.concat(leftRowFactory.create(), rightMaterialized.get(lastPushedInd)); + Row row = rowHnd.concat(leftRowFactory.create(), rightMaterialized.get(lastPushedInd)); rightNotMatchedIndexes.clear(lastPushedInd); @@ -508,7 +363,7 @@ private static class FullOuterJoin extends NestedLoopJoinNode { /** Right row factory. */ private final RowHandler.RowFactory rightRowFactory; - /** Whether current left row was matched or not. */ + /** Shows whether current left row was matched. */ private boolean leftMatched; /** */ @@ -517,10 +372,7 @@ private static class FullOuterJoin extends NestedLoopJoinNode { /** */ private int lastPushedInd; - /** - * @param ctx Execution context. - * @param cond Join expression. - */ + /** */ public FullOuterJoin( ExecutionContext ctx, RelDataType rowType, @@ -536,11 +388,12 @@ public FullOuterJoin( /** {@inheritDoc} */ @Override protected void rewindInternal() { + super.rewindInternal(); + + left = null; leftMatched = false; rightNotMatchedIndexes.clear(); lastPushedInd = 0; - - super.rewindInternal(); } /** {@inheritDoc} */ @@ -573,7 +426,7 @@ public FullOuterJoin( leftMatched = true; rightNotMatchedIndexes.clear(rightIdx - 1); - Row joined = handler.concat(left, right); + Row joined = rowHnd.concat(left, right); downstream().push(joined); } @@ -584,7 +437,7 @@ public FullOuterJoin( requested--; wasPushed = true; - downstream().push(handler.concat(left, rightRowFactory.create())); + downstream().push(rowHnd.concat(left, rightRowFactory.create())); } if (leftMatched || wasPushed) { @@ -612,7 +465,7 @@ public FullOuterJoin( if (lastPushedInd < 0) break; - Row row = handler.concat(leftRowFactory.create(), rightMaterialized.get(lastPushedInd)); + Row row = rowHnd.concat(leftRowFactory.create(), rightMaterialized.get(lastPushedInd)); rightNotMatchedIndexes.clear(lastPushedInd); @@ -637,10 +490,7 @@ public FullOuterJoin( /** */ private static class SemiJoin extends NestedLoopJoinNode { - /** - * @param ctx Execution context. - * @param cond Join expression. - */ + /** */ public SemiJoin(ExecutionContext ctx, RelDataType rowType, BiPredicate cond) { super(ctx, rowType, cond); } @@ -682,10 +532,7 @@ public SemiJoin(ExecutionContext ctx, RelDataType rowType, BiPredicate extends NestedLoopJoinNode { - /** - * @param ctx Execution context. - * @param cond Join expression. - */ + /** */ public AntiJoin(ExecutionContext ctx, RelDataType rowType, BiPredicate cond) { super(ctx, rowType, cond); } @@ -742,13 +589,4 @@ protected boolean checkJoinFinished() throws Exception { return false; } - - /** */ - protected void tryToRequestInputs() throws Exception { - if (waitingLeft == 0 && leftInBuf.size() <= HALF_BUF_SIZE) - leftSource().request(waitingLeft = IN_BUFFER_SIZE - leftInBuf.size()); - - if (waitingRight == 0 && requested > 0) - rightSource().request(waitingRight = IN_BUFFER_SIZE); - } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/hint/HintDefinition.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/hint/HintDefinition.java index a32c028482a1c..f8169935d8746 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/hint/HintDefinition.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/hint/HintDefinition.java @@ -179,6 +179,32 @@ public enum HintDefinition { @Override public HintOptionsChecker optionsChecker() { return CNL_JOIN.optionsChecker(); } + }, + + /** Forces hash join. */ + HASH_JOIN { + /** {@inheritDoc} */ + @Override public HintPredicate predicate() { + return joinHintPredicate(); + } + + /** {@inheritDoc} */ + @Override public HintOptionsChecker optionsChecker() { + return HintsConfig.OPTS_CHECK_NO_KV; + } + }, + + /** Disables hash join. */ + NO_HASH_JOIN { + /** {@inheritDoc} */ + @Override public HintPredicate predicate() { + return HASH_JOIN.predicate(); + } + + /** {@inheritDoc} */ + @Override public HintOptionsChecker optionsChecker() { + return HASH_JOIN.optionsChecker(); + } }; /** diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java index 82023d9bc0986..ee6acfd35b9b7 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashIndexSpool; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashJoin; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexBound; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexCount; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan; @@ -145,6 +146,12 @@ private IgniteReceiver collect(IgniteReceiver receiver) { visit((IgniteRel)rel.getRight()))); } + /** {@inheritDoc} */ + @Override public IgniteRel visit(IgniteHashJoin rel) { + return rel.clone(cluster, F.asList(visit((IgniteRel)rel.getLeft()), + visit((IgniteRel)rel.getRight()))); + } + /** {@inheritDoc} */ @Override public IgniteRel visit(IgniteMergeJoin rel) { return rel.clone(cluster, F.asList(visit((IgniteRel)rel.getLeft()), diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java index a489e9f8616bf..62cb65fd95abe 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java @@ -23,6 +23,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashIndexSpool; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashJoin; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexBound; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexCount; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan; @@ -80,6 +81,11 @@ public class IgniteRelShuttle implements IgniteRelVisitor { return processNode(rel); } + /** {@inheritDoc} */ + @Override public IgniteRel visit(IgniteHashJoin rel) { + return processNode(rel); + } + /** {@inheritDoc} */ @Override public IgniteRel visit(IgniteCorrelatedNestedLoopJoin rel) { return processNode(rel); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java index 5fc738cbc9192..0ff9ae9510bf8 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java @@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.query.calcite.rule.FilterSpoolMergeToHashIndexSpoolRule; import org.apache.ignite.internal.processors.query.calcite.rule.FilterSpoolMergeToSortedIndexSpoolRule; import org.apache.ignite.internal.processors.query.calcite.rule.HashAggregateConverterRule; +import org.apache.ignite.internal.processors.query.calcite.rule.HashJoinConverterRule; import org.apache.ignite.internal.processors.query.calcite.rule.IndexCountRule; import org.apache.ignite.internal.processors.query.calcite.rule.IndexMinMaxRule; import org.apache.ignite.internal.processors.query.calcite.rule.LogicalScanConverterRule; @@ -275,6 +276,7 @@ public enum PlannerPhase { CorrelatedNestedLoopJoinRule.INSTANCE, CorrelateToNestedLoopRule.INSTANCE, NestedLoopJoinConverterRule.INSTANCE, + HashJoinConverterRule.INSTANCE, ValuesConverterRule.INSTANCE, LogicalScanConverterRule.INDEX_SCAN, diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashJoin.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashJoin.java new file mode 100644 index 0000000000000..0b0c58c09122f --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashJoin.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.rel; + +import java.util.List; +import java.util.Set; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelInput; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.CorrelationId; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.Util; +import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost; +import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory; +import org.apache.ignite.internal.processors.query.calcite.util.Commons; + +/** */ +public class IgniteHashJoin extends AbstractIgniteJoin { + /** */ + private static final double DISTINCT_RIGHT_ROWS_RATIO = 0.9; + + /** */ + public IgniteHashJoin( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode left, + RelNode right, + RexNode condition, + Set variablesSet, + JoinRelType joinType + ) { + super(cluster, traitSet, left, right, condition, variablesSet, joinType); + } + + /** Constructor. */ + public IgniteHashJoin(RelInput input) { + this( + input.getCluster(), + input.getTraitSet().replace(IgniteConvention.INSTANCE), + input.getInputs().get(0), + input.getInputs().get(1), + input.getExpression("condition"), + Set.copyOf(Commons.transform(input.getIntegerList("variablesSet"), CorrelationId::new)), + input.getEnum("joinType", JoinRelType.class) + ); + } + + /** {@inheritDoc} */ + @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + IgniteCostFactory costFactory = (IgniteCostFactory)planner.getCostFactory(); + + double leftRowCnt = mq.getRowCount(getLeft()); + + double rightRowCnt = mq.getRowCount(getRight()); + + if (Double.isInfinite(leftRowCnt) || Double.isInfinite(rightRowCnt)) + return planner.getCostFactory().makeInfiniteCost(); + + double rowCnt = leftRowCnt + rightRowCnt; + + int rightKeysSize = joinInfo.rightKeys.size(); + + double rightSize = rightRowCnt * IgniteCost.AVERAGE_FIELD_SIZE * getRight().getRowType().getFieldCount(); + + double distRightRows = Util.first(mq.getDistinctRowCount(right, ImmutableBitSet.of(joinInfo.rightKeys), null), + DISTINCT_RIGHT_ROWS_RATIO * rightRowCnt); + + rightSize += distRightRows * rightKeysSize * IgniteCost.AVERAGE_FIELD_SIZE; + + return costFactory.makeCost(rowCnt, rowCnt * IgniteCost.HASH_LOOKUP_COST, 0, rightSize, 0); + } + + /** {@inheritDoc} */ + @Override public Join copy( + RelTraitSet traitSet, + RexNode condition, + RelNode left, + RelNode right, + JoinRelType joinType, + boolean semiJoinDone + ) { + return new IgniteHashJoin(getCluster(), traitSet, left, right, condition, variablesSet, joinType); + } + + /** {@inheritDoc} */ + @Override public T accept(IgniteRelVisitor visitor) { + return visitor.visit(this); + } + + /** {@inheritDoc} */ + @Override public IgniteRel clone(RelOptCluster cluster, List inputs) { + return new IgniteHashJoin(cluster, getTraitSet(), inputs.get(0), inputs.get(1), getCondition(), + getVariablesSet(), getJoinType()); + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteJoinInfo.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteJoinInfo.java index ea3db26098233..95e2ecbd6b432 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteJoinInfo.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteJoinInfo.java @@ -26,6 +26,7 @@ import org.apache.calcite.rex.RexNode; import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.ImmutableIntList; +import org.apache.ignite.internal.util.typedef.F; /** Extended {@link JoinInfo}. */ public class IgniteJoinInfo extends JoinInfo { @@ -33,7 +34,7 @@ public class IgniteJoinInfo extends JoinInfo { private final ImmutableBitSet allowNulls; /** */ - protected IgniteJoinInfo( + public IgniteJoinInfo( ImmutableIntList leftKeys, ImmutableIntList rightKeys, ImmutableBitSet allowNulls, @@ -54,17 +55,21 @@ public static IgniteJoinInfo of(Join join) { RelOptUtil.splitJoinCondition(join.getLeft(), join.getRight(), join.getCondition(), leftKeys, rightKeys, skipNulls, nonEquis); - ImmutableBitSet.Builder allowNulls = ImmutableBitSet.builder(); + ImmutableBitSet.Builder allowNulls = null; - for (int i = 0; i < skipNulls.size(); ++i) { - if (!skipNulls.get(i)) - allowNulls.set(i); + if (!F.isEmpty(skipNulls)) { + allowNulls = ImmutableBitSet.builder(); + + for (int i = 0; i < skipNulls.size(); ++i) { + if (!skipNulls.get(i)) + allowNulls.set(i); + } } return new IgniteJoinInfo( ImmutableIntList.copyOf(leftKeys), ImmutableIntList.copyOf(rightKeys), - allowNulls == null ? ImmutableBitSet.of() : ImmutableBitSet.of(allowNulls.build()), + allowNulls == null ? ImmutableBitSet.of() : allowNulls.build(), ImmutableList.copyOf(nonEquis) ); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java index 0dc93f1cbb330..0f68002fb9570 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java @@ -54,6 +54,11 @@ public interface IgniteRelVisitor { */ T visit(IgniteNestedLoopJoin rel); + /** + * See {@link IgniteRelVisitor#visit(IgniteRel)}. + */ + T visit(IgniteHashJoin rel); + /** * See {@link IgniteRelVisitor#visit(IgniteRel)} */ diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/AbstractIgniteJoinConverterRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/AbstractIgniteJoinConverterRule.java index a79322b071182..753e3750b29ad 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/AbstractIgniteJoinConverterRule.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/AbstractIgniteJoinConverterRule.java @@ -40,9 +40,11 @@ import static org.apache.calcite.util.Util.last; import static org.apache.ignite.internal.processors.query.calcite.hint.HintDefinition.CNL_JOIN; +import static org.apache.ignite.internal.processors.query.calcite.hint.HintDefinition.HASH_JOIN; import static org.apache.ignite.internal.processors.query.calcite.hint.HintDefinition.MERGE_JOIN; import static org.apache.ignite.internal.processors.query.calcite.hint.HintDefinition.NL_JOIN; import static org.apache.ignite.internal.processors.query.calcite.hint.HintDefinition.NO_CNL_JOIN; +import static org.apache.ignite.internal.processors.query.calcite.hint.HintDefinition.NO_HASH_JOIN; import static org.apache.ignite.internal.processors.query.calcite.hint.HintDefinition.NO_MERGE_JOIN; import static org.apache.ignite.internal.processors.query.calcite.hint.HintDefinition.NO_NL_JOIN; @@ -64,6 +66,7 @@ abstract class AbstractIgniteJoinConverterRule extends AbstractIgniteConverterRu HINTS.put(NL_JOIN, NO_NL_JOIN); HINTS.put(CNL_JOIN, NO_CNL_JOIN); HINTS.put(MERGE_JOIN, NO_MERGE_JOIN); + HINTS.put(HASH_JOIN, NO_HASH_JOIN); ALL_HINTS = Stream.concat(HINTS.keySet().stream(), HINTS.values().stream()).toArray(HintDefinition[]::new); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/HashJoinConverterRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/HashJoinConverterRule.java new file mode 100644 index 0000000000000..de1016d3c511a --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/HashJoinConverterRule.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.rule; + +import java.util.EnumSet; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.PhysicalNode; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.logical.LogicalJoin; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.ignite.internal.processors.query.calcite.hint.HintDefinition; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashJoin; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoinInfo; + +/** Hash join converter rule. */ +public class HashJoinConverterRule extends AbstractIgniteJoinConverterRule { + /** */ + public static final RelOptRule INSTANCE = new HashJoinConverterRule(); + + /** */ + private static final EnumSet NON_EQ_CONDITIONS_SUPPORT = EnumSet.of(JoinRelType.INNER, JoinRelType.SEMI); + + /** Ctor. */ + private HashJoinConverterRule() { + super("HashJoinConverter", HintDefinition.HASH_JOIN); + } + + /** {@inheritDoc} */ + @Override public boolean matchesJoin(RelOptRuleCall call) { + LogicalJoin join = call.rel(0); + + IgniteJoinInfo joinInfo = IgniteJoinInfo.of(join); + + if (joinInfo.pairs().isEmpty()) + return false; + + // Current limitation: unmatched products on left or right part requires special handling of non-equi condition + // on execution level. + return joinInfo.isEqui() || NON_EQ_CONDITIONS_SUPPORT.contains(join.getJoinType()); + } + + /** {@inheritDoc} */ + @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, LogicalJoin rel) { + RelOptCluster cluster = rel.getCluster(); + RelTraitSet outTraits = cluster.traitSetOf(IgniteConvention.INSTANCE); + RelTraitSet leftInTraits = cluster.traitSetOf(IgniteConvention.INSTANCE); + RelTraitSet rightInTraits = cluster.traitSetOf(IgniteConvention.INSTANCE); + RelNode left = convert(rel.getLeft(), leftInTraits); + RelNode right = convert(rel.getRight(), rightInTraits); + + return new IgniteHashJoin(cluster, outTraits, left, right, rel.getCondition(), rel.getVariablesSet(), rel.getJoinType()); + } +} diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java index 9493fcb970159..5c54c4cad4252 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java @@ -217,7 +217,8 @@ public void testCountWithJoin() throws Exception { awaitPartitionMapExchange(true, true, null); - List joinConverters = Arrays.asList("CorrelatedNestedLoopJoin", "MergeJoinConverter", "NestedLoopJoinConverter"); + List joinConverters = Arrays.asList("CorrelatedNestedLoopJoin", "MergeJoinConverter", "NestedLoopJoinConverter", + "HashJoinConverter"); // CorrelatedNestedLoopJoin skipped intentionally since it takes too long to finish // the query with only CNLJ diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java index b38043b404ba4..3b33a0a143e40 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -336,6 +337,11 @@ protected QueryTaskExecutor taskExecutor(UUID nodeId) { return taskExecutors.get(nodeId); } + /** */ + protected ExecutionContext executionContext() { + return executionContext(nodes.get(new Random().nextInt(nodesCnt)), UUID.randomUUID(), 0); + } + /** */ protected ExecutionContext executionContext(UUID nodeId, UUID qryId, long fragmentId) { FragmentDescription fragmentDesc = new FragmentDescription(fragmentId, null, null, null); diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinExecutionTest.java new file mode 100644 index 0000000000000..27776792a55f1 --- /dev/null +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinExecutionTest.java @@ -0,0 +1,425 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.rel; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.function.BiPredicate; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import com.google.common.collect.ImmutableList; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoinInfo; +import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory; +import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils; +import org.apache.ignite.internal.util.typedef.F; +import org.jetbrains.annotations.Nullable; +import org.junit.Test; + +import static org.apache.calcite.rel.core.JoinRelType.ANTI; +import static org.apache.calcite.rel.core.JoinRelType.FULL; +import static org.apache.calcite.rel.core.JoinRelType.INNER; +import static org.apache.calcite.rel.core.JoinRelType.LEFT; +import static org.apache.calcite.rel.core.JoinRelType.RIGHT; +import static org.apache.calcite.rel.core.JoinRelType.SEMI; + +/** */ +public class HashJoinExecutionTest extends AbstractExecutionTest { + /** */ + @Test + public void testHashJoinRewind() { + ExecutionContext ctx = executionContext(); + + RelDataType leftType = TypeUtils.createRowType(ctx.getTypeFactory(), Integer.class, String.class); + RelDataType rightType = TypeUtils.createRowType(ctx.getTypeFactory(), Integer.class, String.class, Integer.class); + + ScanNode deps = new ScanNode<>( + ctx, + rightType, + Arrays.asList( + new Object[] {1, "Core"}, + new Object[] {2, "SQL"}, + new Object[] {3, "QA"} + )); + + ScanNode persons = new ScanNode<>( + ctx, + leftType, + Arrays.asList( + new Object[] {0, "Igor", 1}, + new Object[] {1, "Roman", 2}, + new Object[] {2, "Ivan", 5}, + new Object[] {3, "Alexey", 1} + )); + + HashJoinNode join = createJoinNode(ctx, LEFT, leftType, rightType, null); + + join.register(F.asList(persons, deps)); + + ProjectNode project = new ProjectNode<>(ctx, join.rowType(), r -> new Object[] {r[2], r[3], r[1]}); + project.register(join); + + RootRewindable node = new RootRewindable<>(ctx, project.rowType()); + + node.register(project); + + assert node.hasNext(); + + ArrayList rows = new ArrayList<>(); + + while (node.hasNext()) + rows.add(node.next()); + + Object[][] expected = { + {1, 1, "Igor"}, + {1, 1, "Alexey"}, + {2, 2, "Roman"}, + {5, null, "Ivan"}, + }; + + checkResults(expected, rows); + + node.rewind(); + + assert node.hasNext(); + + rows.clear(); + + while (node.hasNext()) + rows.add(node.next()); + + checkResults(expected, rows); + } + + /** */ + @Test + public void testEquiJoinWithDifferentBufferSize() { + for (JoinRelType joinType : F.asList(INNER, LEFT, RIGHT, FULL, SEMI, ANTI)) { + validateEquiJoin(joinType, 0, 0); + validateEquiJoin(joinType, 0, 1); + validateEquiJoin(joinType, 0, 10); + validateEquiJoin(joinType, 1, 0); + validateEquiJoin(joinType, 1, 1); + validateEquiJoin(joinType, 1, 10); + validateEquiJoin(joinType, 10, 0); + validateEquiJoin(joinType, 10, 1); + validateEquiJoin(joinType, 10, 10); + + int testSize = IN_BUFFER_SIZE; + + validateEquiJoin(joinType, 0, testSize - 1); + validateEquiJoin(joinType, 0, testSize); + validateEquiJoin(joinType, 0, testSize + 1); + + validateEquiJoin(joinType, testSize - 1, 0); + validateEquiJoin(joinType, testSize - 1, testSize - 1); + validateEquiJoin(joinType, testSize - 1, testSize); + validateEquiJoin(joinType, testSize - 1, testSize + 1); + + validateEquiJoin(joinType, testSize, 0); + validateEquiJoin(joinType, testSize, testSize - 1); + validateEquiJoin(joinType, testSize, testSize); + validateEquiJoin(joinType, testSize, testSize + 1); + + validateEquiJoin(joinType, testSize + 1, 0); + validateEquiJoin(joinType, testSize + 1, testSize - 1); + validateEquiJoin(joinType, testSize + 1, testSize); + validateEquiJoin(joinType, testSize + 1, testSize + 1); + + validateEquiJoin(joinType, 2 * testSize, 0); + validateEquiJoin(joinType, 0, 2 * testSize); + validateEquiJoin(joinType, 2 * testSize, 2 * testSize); + } + } + + /** */ + @Test + public void testNonEquiJoinWithDifferentBufferSize() { + JoinRelType joinType = INNER; + + validateNonEquiJoin(joinType, 0, 0); + validateNonEquiJoin(joinType, 0, 1); + validateNonEquiJoin(joinType, 0, 10); + validateNonEquiJoin(joinType, 1, 0); + validateNonEquiJoin(joinType, 1, 1); + validateNonEquiJoin(joinType, 1, 10); + validateNonEquiJoin(joinType, 10, 0); + validateNonEquiJoin(joinType, 10, 1); + validateNonEquiJoin(joinType, 10, 10); + + int testSize = IN_BUFFER_SIZE; + + validateNonEquiJoin(joinType, 0, testSize - 1); + validateNonEquiJoin(joinType, 0, testSize); + validateNonEquiJoin(joinType, 0, testSize + 1); + + validateNonEquiJoin(joinType, testSize - 1, 0); + validateNonEquiJoin(joinType, testSize - 1, testSize - 1); + validateNonEquiJoin(joinType, testSize - 1, testSize); + validateNonEquiJoin(joinType, testSize - 1, testSize + 1); + + validateNonEquiJoin(joinType, testSize, 0); + validateNonEquiJoin(joinType, testSize, testSize - 1); + validateNonEquiJoin(joinType, testSize, testSize); + validateNonEquiJoin(joinType, testSize, testSize + 1); + + validateNonEquiJoin(joinType, testSize + 1, 0); + validateNonEquiJoin(joinType, testSize + 1, testSize - 1); + validateNonEquiJoin(joinType, testSize + 1, testSize); + validateNonEquiJoin(joinType, testSize + 1, testSize + 1); + + validateNonEquiJoin(joinType, 2 * testSize, 0); + validateNonEquiJoin(joinType, 0, 2 * testSize); + validateNonEquiJoin(joinType, 2 * testSize, 2 * testSize); + } + + /** */ + @Test + public void testInnerJoinWithPostFiltration() { + Object[][] persons = { + new Object[] {0, "Igor", 1}, + new Object[] {1, "Roman", 2}, + new Object[] {2, "Ivan", 5}, + new Object[] {3, "Alexey", 1} + }; + + Object[][] deps = { + new Object[] {1, "Core"}, + new Object[] {2, "SQL"}, + new Object[] {3, "QA"} + }; + + BiPredicate condition = (l, r) -> ((CharSequence)r[1]).length() > 3 && ((CharSequence)l[1]).length() > 4; + + Object[][] expected = {{3, "Alexey", 1, 1, "Core"}}; + + validate(INNER, Stream.of(persons)::iterator, Stream.of(deps)::iterator, expected, -1, condition); + } + + /** */ + @Test + public void testSemiJoinWithPostFiltration() { + Object[][] persons = { + new Object[] {0, "Igor", 1}, + new Object[] {1, "Roman", 2}, + new Object[] {2, "Ivan", 5}, + new Object[] {3, "Alexey", 1} + }; + + Object[][] deps = { + new Object[] {1, "Core"}, + new Object[] {2, "SQL"}, + new Object[] {3, "QA"} + }; + + BiPredicate condition = (l, r) -> ((CharSequence)r[1]).length() > 3 && ((CharSequence)l[1]).length() > 4; + + Object[][] expected = {{3, "Alexey", 1}}; + + validate(SEMI, Stream.of(persons)::iterator, Stream.of(deps)::iterator, expected, -1, condition); + } + + /** */ + private void validateEquiJoin(JoinRelType joinType, int leftSize, int rightSize) { + if (log.isInfoEnabled()) { + log.info("Testing eq. join with different buffer sizes. Join type: " + joinType + ", leftSz: " + + leftSize + ", rightSz: " + rightSize); + } + + { // Distinct inputs + Object[] person = {1, "name", 2}; + Object[] department = {1, "department"}; + + int resultSize = estimateResultSizeForDistinctInputs(joinType, leftSize, rightSize); + + validate( + joinType, + () -> IntStream.range(0, leftSize).mapToObj(i -> person).iterator(), + () -> IntStream.range(0, rightSize).mapToObj(i -> department).iterator(), + null, + resultSize, + null + ); + } + + { // Matching inputs + Object[] person = {1, "name", 2}; + Object[] department = {2, "department"}; + + int resultSize = estimateResultSizeForEqualInputs(joinType, leftSize, rightSize); + + validate( + joinType, + () -> IntStream.range(0, leftSize).mapToObj(i -> person).iterator(), + () -> IntStream.range(0, rightSize).mapToObj(i -> department).iterator(), + null, + resultSize, + null + ); + } + } + + /** */ + protected void validateNonEquiJoin(JoinRelType joinType, int leftSize, int rightSize) { + Object[] person = {1, "name", 2}; + Object[] department = {2, "department"}; + + int resultSize = estimateResultSizeForEqualInputs(joinType, leftSize, rightSize); + + validate( + joinType, + () -> IntStream.range(0, leftSize).mapToObj(i -> person).iterator(), + () -> IntStream.range(0, rightSize).mapToObj(i -> department).iterator(), + null, + resultSize, + (l, r) -> true + ); + + validate( + joinType, + () -> IntStream.range(0, leftSize).mapToObj(i -> person).iterator(), + () -> IntStream.range(0, rightSize).mapToObj(i -> department).iterator(), + null, + 0, + (l, r) -> false + ); + } + + /** */ + private static int estimateResultSizeForDistinctInputs(JoinRelType joinType, int leftSize, int rightSize) { + switch (joinType) { + case SEMI: // Fallthrough + case INNER: + return 0; + case ANTI: // Fallthrough + case LEFT: + return leftSize; + case RIGHT: + return rightSize; + case FULL: + return leftSize + rightSize; + default: + throw new IllegalArgumentException("Unsupported join type: " + joinType); + } + } + + /** */ + private static int estimateResultSizeForEqualInputs( + JoinRelType joinType, + int leftSize, + int rightSize + ) { + switch (joinType) { + case SEMI: + return rightSize == 0 ? 0 : leftSize; + case ANTI: + return rightSize == 0 ? leftSize : 0; + case LEFT: + return rightSize == 0 ? leftSize : leftSize * rightSize; + case RIGHT: + return leftSize == 0 ? rightSize : leftSize * rightSize; + case FULL: + return leftSize == 0 ? rightSize : rightSize == 0 ? leftSize : leftSize * rightSize; + case INNER: + return leftSize * rightSize; + default: + throw new IllegalArgumentException("Unsupported join type: " + joinType); + } + } + + /** */ + private void validate( + JoinRelType joinType, + Iterable leftSrc, + Iterable rightSrc, + @Nullable Object[][] expected, + int resultSize, + @Nullable BiPredicate postCondition + ) { + ExecutionContext ctx = executionContext(); + + IgniteTypeFactory tf = ctx.getTypeFactory(); + + RelDataType leftType = TypeUtils.createRowType(tf, Integer.class, String.class, Integer.class); + RelDataType rightType = TypeUtils.createRowType(tf, Integer.class, String.class); + + ScanNode left = new ScanNode<>(ctx, leftType, leftSrc); + ScanNode right = new ScanNode<>(ctx, rightType, rightSrc); + + AbstractRightMaterializedJoinNode join = createJoinNode(ctx, joinType, leftType, rightType, postCondition); + + join.register(F.asList(left, right)); + + RootNode node = new RootNode<>(ctx, join.rowType()); + + node.register(join); + + ArrayList result = new ArrayList<>(); + + while (node.hasNext()) + result.add(node.next()); + + if (resultSize >= 0) + assertEquals(resultSize, result.size()); + + if (expected != null) + checkResults(expected, result); + } + + /** */ + private static HashJoinNode createJoinNode( + ExecutionContext ctx, + JoinRelType joinType, + RelDataType leftType, + RelDataType rightType, + @Nullable BiPredicate postCondition + ) { + IgniteTypeFactory tf = ctx.getTypeFactory(); + + RelDataType outType = (joinType == ANTI || joinType == SEMI) + ? leftType + : TypeUtils.combinedRowType(tf, leftType, rightType); + + IgniteJoinInfo joinInfo = new IgniteJoinInfo(ImmutableIntList.of(2), ImmutableIntList.of(0), + ImmutableBitSet.of(), ImmutableList.of()); + + return HashJoinNode.create(ctx, outType, leftType, rightType, joinType, joinInfo, postCondition); + } + + /** */ + private static void checkResults(Object[][] expected, ArrayList actual) { + assertEquals(expected.length, actual.size()); + + actual.sort(Comparator.comparing(r -> (int)r[0])); + + int length = expected.length; + + for (int i = 0; i < length; ++i) { + Object[] exp = expected[i]; + Object[] act = actual.get(i); + + assertEquals(exp.length, act.length); + assertEquals(0, F.compareArrays(exp, act)); + } + } +} diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/JoinBuffersExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/JoinBuffersExecutionTest.java index f5b01c376fcb3..397fb702f4462 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/JoinBuffersExecutionTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/JoinBuffersExecutionTest.java @@ -23,12 +23,15 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.BiPredicate; import java.util.function.Consumer; import java.util.stream.IntStream; +import com.google.common.collect.ImmutableList; import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.ImmutableIntList; import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoinInfo; import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.testframework.GridTestUtils; @@ -43,15 +46,13 @@ import static org.apache.calcite.rel.core.JoinRelType.RIGHT; import static org.apache.calcite.rel.core.JoinRelType.SEMI; -/** - * - */ +/** Tests that buffers of join nodes are cleared at the join end and that a join node is not stuck. */ @RunWith(Parameterized.class) public class JoinBuffersExecutionTest extends AbstractExecutionTest { - /** Tests merge join with input bigger that the buffer size. */ + /** */ @Test public void testMergeJoinBuffers() throws Exception { - JoinFactory joinFactory = (ctx, outType, leftType, rightType, joinType, cond) -> + JoinFactory joinFactory = (ctx, outType, leftType, rightType, joinType) -> MergeJoinNode.create(ctx, outType, leftType, rightType, joinType, Comparator.comparingInt(r -> (Integer)r[0]), true); Consumer> bufChecker = (node) -> { @@ -60,28 +61,47 @@ public void testMergeJoinBuffers() throws Exception { assertTrue(((MergeJoinNode)node).rightInBuf.size() <= IN_BUFFER_SIZE); }; - doTestJoinBuffer(joinFactory, bufChecker); + doTestJoinBuffer(joinFactory, bufChecker, false); } - /** Tests NL with input bigger that the buffer size. */ + /** */ @Test public void testNLJoinBuffers() throws Exception { - JoinFactory joinFactory = (ctx, outType, leftType, rightType, joinType, cond) -> + JoinFactory joinFactory = (ctx, outType, leftType, rightType, joinType) -> NestedLoopJoinNode.create(ctx, outType, leftType, rightType, joinType, (r1, r2) -> r1[0].equals(r2[0])); Consumer> bufChecker = (node) -> - assertTrue(((NestedLoopJoinNode)node).leftInBuf.size() <= IN_BUFFER_SIZE); + assertTrue(((AbstractRightMaterializedJoinNode)node).leftInBuf.size() <= IN_BUFFER_SIZE); + + doTestJoinBuffer(joinFactory, bufChecker, false); + } + + /** */ + @Test + public void testHashJoinBuffers() throws Exception { + IgniteJoinInfo joinInfo = new IgniteJoinInfo(ImmutableIntList.of(0), ImmutableIntList.of(0), + ImmutableBitSet.of(), ImmutableList.of()); + + JoinFactory joinFactory = (ctx, outType, leftType, rightType, joinType) -> + HashJoinNode.create(ctx, outType, leftType, rightType, joinType, joinInfo, null); + + Consumer> bufChecker = (node) -> + assertTrue(((AbstractRightMaterializedJoinNode)node).leftInBuf.size() <= IN_BUFFER_SIZE); - doTestJoinBuffer(joinFactory, bufChecker); + doTestJoinBuffer(joinFactory, bufChecker, true); } /** + * Tests a join with input bigger that the buffer size. + * * @param joinFactory Creates certain join node. * @param joinBufChecker Finally check node after successfull run. + * @param sortResults If {@code true}, sorts results before checking. */ private void doTestJoinBuffer( JoinFactory joinFactory, - Consumer> joinBufChecker + Consumer> joinBufChecker, + boolean sortResults ) throws Exception { for (JoinRelType joinType : F.asList(LEFT, INNER, RIGHT, FULL, SEMI, ANTI)) { if (log.isInfoEnabled()) @@ -106,8 +126,7 @@ private void doTestJoinBuffer( RelDataType outType = TypeUtils.createRowType(ctx.getTypeFactory(), int.class, int.class); - AbstractNode join = joinFactory.create(ctx, outType, leftType, rightType, joinType, - (r1, r2) -> r1[0].equals(r2[0])); + AbstractNode join = joinFactory.create(ctx, outType, leftType, rightType, joinType); join.register(F.asList(leftNode, rightNode)); @@ -139,6 +158,10 @@ private void doTestJoinBuffer( assertTrue(GridTestUtils.waitForCondition(finished::get, getTestTimeout())); + // Sorting might be needed because join may not produce a sorted result. + if (sortResults) + sortResults(res, joinType); + switch (joinType) { case LEFT: assertEquals(size + intersect, res.size()); @@ -222,6 +245,36 @@ private void doTestJoinBuffer( } } + /** */ + private static void sortResults(List res, JoinRelType joinType) { + res.sort(new Comparator<>() { + @Override public int compare(Object[] row0, Object[] row1) { + assert row0.length == row1.length; + + int v1; + int v2; + + if (joinType == SEMI || joinType == ANTI) { + assert row0.length == 1; + assert row0[0] != null && row1[0] != null; + + v1 = (int)row0[0]; + v2 = (int)row1[0]; + } + else { + assert row0.length == 2; + assert (row0[0] == row0[1] && row0[0] != null) || row0[0] != null || row0[1] != null; + assert (row1[0] == row1[1] && row1[0] != null) || row1[0] != null || row1[1] != null; + + v1 = (int)(row0[0] == null ? row0[1] : row0[0]); + v2 = (int)(row1[0] == null ? row1[1] : row1[0]); + } + + return Integer.compare(v1, v2); + } + }); + } + /** */ @FunctionalInterface protected interface JoinFactory { @@ -231,8 +284,7 @@ AbstractNode create( RelDataType outType, RelDataType leftType, RelDataType rightType, - JoinRelType joinType, - BiPredicate cond + JoinRelType joinType ); } } diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/JoinIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/JoinIntegrationTest.java index e9af5c24ed801..8e81db6d4f1de 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/JoinIntegrationTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/JoinIntegrationTest.java @@ -50,14 +50,14 @@ public static List params() { @Override protected void init() throws Exception { super.init(); - executeSql("create table t1 (c1 int, c2 int, c3 int) WITH " + atomicity()); - executeSql("create table t2 (c1 int, c2 int, c3 int) WITH " + atomicity()); + sql("create table t1 (c1 int, c2 int, c3 int) WITH " + atomicity()); + sql("create table t2 (c1 int, c2 int, c3 int) WITH " + atomicity()); - executeSql("create index t1_idx on t1 (c3, c2, c1)"); - executeSql("create index t2_idx on t2 (c3, c2, c1)"); + sql("create index t1_idx on t1 (c3, c2, c1)"); + sql("create index t2_idx on t2 (c3, c2, c1)"); - executeSql("insert into t1 values (1, 1, 1), (2, null, 2), (2, 2, 2), (3, 3, null), (3, 3, 3), (4, 4, 4)"); - executeSql("insert into t2 values (1, 1, 1), (2, 2, null), (2, 2, 2), (3, null, 3), (3, 3, 3), (4, 4, 4)"); + sql("insert into t1 values (1, 1, 1), (2, null, 2), (2, 2, 2), (3, 3, null), (3, 3, 3), (4, 4, 4)"); + sql("insert into t2 values (1, 1, 1), (2, 2, null), (2, 2, 2), (3, null, 3), (3, 3, 3), (4, 4, 4)"); } /** {@inheritDoc} */ @@ -109,7 +109,7 @@ public void testIsNotDistinctWithEquiConditionFrom() { } /** - * Test verifies result of inner join with different ordering. + * Test verifies result of inner join. */ @Test public void testInnerJoin() { @@ -294,10 +294,86 @@ public void testInnerJoin() { .returns(4, 4, 4, 4, 4) .returns(null, 3, 3, 3, 3) .check(); + + assertQuery("select t1.c2, t2.c3 from t1 join t2 on t1.c2 is not distinct from t2.c3 order by t1.c2, t2.c3") + .returns(1, 1) + .returns(2, 2) + .returns(3, 3) + .returns(3, 3) + .returns(3, 3) + .returns(3, 3) + .returns(4, 4) + .returns(null, null) + .check(); + + assertQuery("select t1.c2, t1.c3, t2.c1, t2.c3 from t1 join t2 on t1.c2 is not distinct from t2.c3 and " + + "t1.c3 is not distinct from t2.c1 order by t1.c2, t1.c3, t2.c1, t2.c3") + .returns(1, 1, 1, 1) + .returns(2, 2, 2, 2) + .returns(3, 3, 3, 3) + .returns(3, 3, 3, 3) + .returns(4, 4, 4, 4) + .returns(null, 2, 2, null) + .check(); + + assertQuery("select t1.c2, t1.c3, t2.c3 from t1 join t2 on t1.c2 is not distinct from t2.c3 and t1.c3 > 3") + .returns(4, 4, 4) + .check(); + + // MERGE JOIN doesn't support non-equi conditions. + if (joinType != JoinType.MERGE) { + assertQuery("select t1.c2, t1.c3, t2.c2+3 as t2c2, t2.c3 from t1 join t2 on t1.c2 is not distinct from t2.c3 and " + + "t1.c3 3" + + " order by t1.c2, t1.c3, t2.c3") + .returns(1, 1, null) + .returns(2, 2, null) + .returns(3, 3, null) + .returns(3, null, null) + .returns(4, 4, 4) + .returns(null, 2, null) + .check(); + + assertQuery("select t1.c2, t1.c3, t2.c2+3 as t2c2, t2.c3 from t1 left join t2 on t1.c2 is not distinct from t2.c3 and " + + "t1.c3 3" + + " order by t1.c2, t1.c3, t2.c3") + .returns(4, 4, 4) + .returns(null, null, 1) + .returns(null, null, 2) + .returns(null, null, 3) + .returns(null, null, 3) + .returns(null, null, null) + .check(); + + assertQuery("select t1.c2, t1.c3, t2.c2+3 as t2c2, t2.c3 from t1 right join t2 on t1.c2 is not distinct from t2.c3 and " + + "t1.c3 3" + + " order by t1.c2, t1.c3, t2.c3") + .returns(1, 1, null) + .returns(2, 2, null) + .returns(3, 3, null) + .returns(3, null, null) + .returns(4, 4, 4) + .returns(null, 2, null) + .returns(null, null, 1) + .returns(null, null, 2) + .returns(null, null, 3) + .returns(null, null, 3) + .returns(null, null, null) + .check(); + + assertQuery("select t1.c2, t1.c3, t2.c2+3 as t2c2, t2.c3 from t1 full join t2 on t1.c2 is not distinct from t2.c3 and " + + "t1.c3> params = F.asList(F.asList(HintDefinition.NL_JOIN.name(), "NestedLoopJoin"), + F.asList(HintDefinition.HASH_JOIN.name(), "IgniteHashJoin")); + + for (List params0 : params) { + assertQuery("SELECT /*+ " + params0.get(0) + " */ tbl.id, tbl.b, tbl2.id, tbl2.b FROM tbl JOIN tbl2 USING (id)") + .matches(QueryChecker.containsSubPlan(params0.get(1))) + .resultSize(800) + .check(); + } for (int i = 800; i < 1000; i++) sql("INSERT INTO tbl2 VALUES (?, ?)", i, new byte[1000]); - assertThrows("SELECT /*+ NL_JOIN */ tbl.id, tbl.b, tbl2.id, tbl2.b FROM tbl JOIN tbl2 USING (id)", - IgniteException.class, "Query quota exceeded"); + for (List paramSet : params) { + assertThrows("SELECT /*+ " + paramSet.get(0) + " */ tbl.id, tbl.b, tbl2.id, tbl2.b FROM tbl JOIN tbl2 USING (id)", + IgniteException.class, "Query quota exceeded"); + } } /** */ diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/CorrelatedNestedLoopJoinPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/CorrelatedNestedLoopJoinPlannerTest.java index 84047593d7b20..55ad49ace4c93 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/CorrelatedNestedLoopJoinPlannerTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/CorrelatedNestedLoopJoinPlannerTest.java @@ -59,7 +59,7 @@ public void testValidIndexExpressions() throws Exception { IgniteRel phys = physicalPlan( sql, publicSchema, - "MergeJoinConverter", "NestedLoopJoinConverter" + "MergeJoinConverter", "NestedLoopJoinConverter", "HashJoinConverter" ); System.out.println("+++ " + RelOptUtil.toString(phys)); diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/CorrelatedSubqueryPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/CorrelatedSubqueryPlannerTest.java index 2b3e17264c200..97e6a4debbb70 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/CorrelatedSubqueryPlannerTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/CorrelatedSubqueryPlannerTest.java @@ -336,8 +336,8 @@ public void testCorrelatedDistribution() throws Exception { // Correlate on top of another join. sql = "SELECT a FROM ta1 WHERE ta1.a IN (SELECT a FROM th1) AND EXISTS (SELECT 1 FROM ta2 WHERE ta2.b = ta1.a)"; assertPlan(sql, schema, hasChildThat(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class) - .and(input(0, isInstanceOf(AbstractIgniteJoin.class))) - .and(input(1, isInstanceOf(IgniteColocatedAggregateBase.class) + .and(input(0, nodeOrAnyChild(isInstanceOf(AbstractIgniteJoin.class)))) + .and(input(1, nodeOrAnyChild(isInstanceOf(IgniteColocatedAggregateBase.class)) .and(hasChildThat(isInstanceOf(IgniteExchange.class)).negate()) )))); diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/HashIndexSpoolPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/HashIndexSpoolPlannerTest.java index b0b0ccb447cf3..d83844f183bb2 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/HashIndexSpoolPlannerTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/HashIndexSpoolPlannerTest.java @@ -84,7 +84,7 @@ public void testSingleKey() throws Exception { IgniteRel phys = physicalPlan( sql, publicSchema, - "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeToSortedIndexSpoolRule" + "MergeJoinConverter", "NestedLoopJoinConverter", "HashJoinConverter", "FilterSpoolMergeToSortedIndexSpoolRule" ); System.out.println("+++\n" + RelOptUtil.toString(phys)); @@ -149,7 +149,7 @@ public void testMultipleKeys() throws Exception { IgniteRel phys = physicalPlan( sql, publicSchema, - "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeToSortedIndexSpoolRule" + "MergeJoinConverter", "NestedLoopJoinConverter", "HashJoinConverter", "FilterSpoolMergeToSortedIndexSpoolRule" ); checkSplitAndSerialization(phys, publicSchema); @@ -212,7 +212,7 @@ public void testSourceWithoutCollation() throws Exception { IgniteRel phys = physicalPlan( sql, publicSchema, - "MergeJoinConverter", "NestedLoopJoinConverter" + "MergeJoinConverter", "NestedLoopJoinConverter", "HashJoinConverter" ); checkSplitAndSerialization(phys, publicSchema); diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/HashJoinPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/HashJoinPlannerTest.java new file mode 100644 index 0000000000000..1b68cee57c3cf --- /dev/null +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/HashJoinPlannerTest.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.planner; + +import java.util.List; +import org.apache.calcite.plan.RelOptPlanner.CannotPlanException; +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashJoin; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort; +import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema; +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.jetbrains.annotations.Nullable; +import org.junit.Test; + +import static org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING; +import static org.apache.ignite.testframework.GridTestUtils.assertThrows; + +/** */ +public class HashJoinPlannerTest extends AbstractPlannerTest { + /** */ + private static final String[] DISABLED_RULES = {"NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "MergeJoinConverter", + "JoinCommuteRule"}; + + /** */ + private static final String[] JOIN_TYPES = {"LEFT", "RIGHT", "INNER", "FULL"}; + + /** */ + @Test + public void testHashJoinKeepsLeftCollation() throws Exception { + TestTable tbl1 = createSimpleTable(); + TestTable tbl2 = createComplexTable(); + + IgniteSchema schema = createSchema(tbl1, tbl2); + + String sql = "select t1.ID, t2.ID1 " + + "from TEST_TBL_CMPLX t2 " + + "join TEST_TBL t1 on t1.id = t2.id1 " + + "order by t2.ID1 NULLS LAST, t2.ID2 NULLS LAST"; + + assertPlan(sql, schema, nodeOrAnyChild(isInstanceOf(IgniteHashJoin.class)) + .and(nodeOrAnyChild(isInstanceOf(IgniteSort.class)).negate()), DISABLED_RULES); + } + + /** */ + @Test + public void testHashJoinErasesRightCollation() throws Exception { + TestTable tbl1 = createSimpleTable(); + TestTable tbl2 = createComplexTable(); + + IgniteSchema schema = createSchema(tbl1, tbl2); + + String sql = "select t1.ID, t2.ID1 " + + "from TEST_TBL t1 " + + "join TEST_TBL_CMPLX t2 on t1.id = t2.id1 " + + "order by t2.ID1 NULLS LAST, t2.ID2 NULLS LAST"; + + assertPlan(sql, schema, nodeOrAnyChild(isInstanceOf(IgniteSort.class) + .and(input(isInstanceOf(IgniteHashJoin.class)))), DISABLED_RULES); + } + + /** */ + @Test + public void testHashJoinWinsOnSkewedLeftInput() throws Exception { + TestTable smallTbl = createSimpleTable("SMALL_TBL", 1000); + TestTable largeTbl = createSimpleTable("LARGE_TBL", 500_000); + + IgniteSchema schema = createSchema(smallTbl, largeTbl); + + assertPlan( + "select t1.ID, t1.INT_VAL, t2.ID, t2.INT_VAL from LARGE_TBL t1 join SMALL_TBL t2 on t1.INT_VAL = t2.INT_VAL", + schema, + nodeOrAnyChild(isInstanceOf(IgniteHashJoin.class)), + "JoinCommuteRule" + ); + + assertPlan( + "select t1.ID, t1.INT_VAL, t2.ID, t2.INT_VAL from SMALL_TBL t1 join LARGE_TBL t2 on t1.INT_VAL = t2.INT_VAL", + schema, + nodeOrAnyChild(isInstanceOf(IgniteHashJoin.class).negate()), + "JoinCommuteRule" + ); + + // Merge join can consume less CPU resources. + assertPlan( + "select t1.ID, t1.INT_VAL, t2.ID, t2.INT_VAL from SMALL_TBL t1 join LARGE_TBL t2 on t1.ID = t2.ID", + schema, + nodeOrAnyChild(isInstanceOf(IgniteHashJoin.class).negate()), + "JoinCommuteRule" + ); + } + + /** */ + private static @Nullable IgniteSort sortOnTopOfJoin(IgniteRel root) { + List sortNodes = findNodes(root, byClass(IgniteSort.class) + .and(node -> node.getInputs().size() == 1 && node.getInput(0) instanceof Join)); + + if (sortNodes.size() > 1) + throw new IllegalStateException("Incorrect sort nodes number: expected 1, actual " + sortNodes.size()); + + return sortNodes.isEmpty() ? null : sortNodes.get(0); + } + + /** */ + @Test + public void testHashJoinApplied() throws Exception { + // Parms: request, can be planned, only INNER or SEMI join. + List> testParams = F.asList( + F.asList("select t1.c1 from t1 %s join t2 on t1.id = t2.id", true, false), + F.asList("select t1.c1 from t1 %s join t2 on t1.id = t2.id and t1.c1=t2.c1", true, false), + F.asList("select t1.c1 from t1 %s join t2 using(c1)", true, false), + F.asList("select t1.c1 from t1 %s join t2 on t1.c1 = 1", false, false), + F.asList("select t1.c1 from t1 %s join t2 ON t1.id is not distinct from t2.c1", true, false), + F.asList("select t1.c1 from t1 %s join t2 ON t1.id is not distinct from t2.c1 and t1.c1 is not distinct from t2.id", + true, false), + F.asList("select t1.c1 from t1 %s join t2 ON t1.id is not distinct from t2.c1 and t1.c1 = t2.id", true, false), + F.asList("select t1.c1 from t1 %s join t2 ON t1.id is not distinct from t2.c1 and t1.c1 > t2.id", true, true), + F.asList("select t1.c1 from t1 %s join t2 on t1.c1 = ?", false, false), + F.asList("select t1.c1 from t1 %s join t2 on t1.c1 = OCTET_LENGTH('TEST')", false, false), + F.asList("select t1.c1 from t1 %s join t2 on t1.c1 = LOG10(t1.c1)", false, false), + F.asList("select t1.c1 from t1 %s join t2 on t1.c1 = t2.c1 and t1.ID > t2.ID", true, true), + F.asList("select t1.c1 from t1 %s join t2 on t1.c1 = 1 and t2.c1 = 1", false, false) + ); + + for (List paramSet : testParams) { + assert paramSet != null && paramSet.size() == 3; + + String sql = (String)paramSet.get(0); + boolean canBePlanned = (Boolean)paramSet.get(1); + boolean onlyInnerOrSemi = (Boolean)paramSet.get(2); + + TestTable tbl1 = createTable("T1", IgniteDistributions.single(), "ID", Integer.class, "C1", Integer.class); + TestTable tbl2 = createTable("T2", IgniteDistributions.single(), "ID", Integer.class, "C1", Integer.class); + + IgniteSchema schema = createSchema(tbl1, tbl2); + + for (String joinType : JOIN_TYPES) { + if (onlyInnerOrSemi && !joinType.equals("INNER") && !joinType.equals("SEMI")) + continue; + + String sql0 = String.format(sql, joinType); + + if (log.isInfoEnabled()) + log.info("Testing query '" + sql0 + "' for join type " + joinType); + + if (canBePlanned) + assertPlan(sql0, schema, nodeOrAnyChild(isInstanceOf(IgniteHashJoin.class)), DISABLED_RULES); + else { + assertThrows(null, () -> physicalPlan(sql0, schema, DISABLED_RULES), CannotPlanException.class, + "There are not enough rules"); + } + } + } + } + + /** */ + private static TestTable createSimpleTable() { + return createSimpleTable("TEST_TBL", DEFAULT_TBL_SIZE); + } + + /** */ + private static TestTable createSimpleTable(String name, int size) { + return createTable( + name, + size, + IgniteDistributions.affinity(0, CU.cacheId("default"), 0), + "ID", Integer.class, + "INT_VAL", Integer.class, + "STR_VAL", String.class + ).addIndex( + RelCollations.of(new RelFieldCollation(0, ASCENDING, RelFieldCollation.NullDirection.LAST)), + "PK" + ); + } + + /** */ + private static TestTable createComplexTable() { + return createTable( + "TEST_TBL_CMPLX", + DEFAULT_TBL_SIZE, + IgniteDistributions.affinity(ImmutableIntList.of(0, 1), CU.cacheId("default"), 0), + "ID1", Integer.class, + "ID2", Integer.class, + "STR_VAL", String.class + ).addIndex( + RelCollations.of( + new RelFieldCollation(0, ASCENDING, RelFieldCollation.NullDirection.LAST), + new RelFieldCollation(1, ASCENDING, RelFieldCollation.NullDirection.LAST) + ), + "PK" + ); + } +} diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/JoinColocationPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/JoinColocationPlannerTest.java index b37558d9707a7..5c72d742ffb47 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/JoinColocationPlannerTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/JoinColocationPlannerTest.java @@ -22,6 +22,7 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Join; import org.apache.calcite.util.ImmutableIntList; +import org.apache.ignite.internal.processors.query.calcite.rel.AbstractIgniteJoin; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMergeJoin; @@ -45,6 +46,9 @@ * Test suite to verify join colocation. */ public class JoinColocationPlannerTest extends AbstractPlannerTest { + /** */ + private static final String[] DISABLED_RULES = new String[] {"HashJoinConverter", "MergeJoinConverter"}; + /** * Join of the same tables with a simple affinity is expected to be colocated. */ @@ -65,16 +69,13 @@ public void joinSameTableSimpleAff() throws Exception { "from TEST_TBL t1 " + "join TEST_TBL t2 on t1.id = t2.id"; - RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin"); - - IgniteMergeJoin join = findFirstNode(phys, byClass(IgniteMergeJoin.class)); - - String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys); - - assertThat(invalidPlanMsg, join, notNullValue()); - assertThat(invalidPlanMsg, join.distribution().function().affinity(), is(true)); - assertThat(invalidPlanMsg, join.getLeft(), instanceOf(IgniteIndexScan.class)); - assertThat(invalidPlanMsg, join.getRight(), instanceOf(IgniteIndexScan.class)); + for (String disabledRule : DISABLED_RULES) { + assertPlan(sql, schema, nodeOrAnyChild(isInstanceOf(AbstractIgniteJoin.class) + .and(join -> join.distribution().function().affinity()) + .and(input(0, isInstanceOf(IgniteIndexScan.class))) + .and(input(1, isInstanceOf(IgniteIndexScan.class))) + ), "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", disabledRule); + } } /** @@ -98,16 +99,18 @@ public void joinSameTableComplexAff() throws Exception { "from TEST_TBL t1 " + "join TEST_TBL t2 on t1.id1 = t2.id1 and t1.id2 = t2.id2"; - RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin"); + for (String disabledRule : DISABLED_RULES) { + RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", disabledRule); - IgniteMergeJoin join = findFirstNode(phys, byClass(IgniteMergeJoin.class)); + AbstractIgniteJoin join = findFirstNode(phys, byClass(AbstractIgniteJoin.class)); - String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys); + String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys); - assertThat(invalidPlanMsg, join, notNullValue()); - assertThat(invalidPlanMsg, join.distribution().function().affinity(), is(true)); - assertThat(invalidPlanMsg, join.getLeft(), instanceOf(IgniteIndexScan.class)); - assertThat(invalidPlanMsg, join.getRight(), instanceOf(IgniteIndexScan.class)); + assertThat(invalidPlanMsg, join, notNullValue()); + assertThat(invalidPlanMsg, join.distribution().function().affinity(), is(true)); + assertThat(invalidPlanMsg, join.getLeft(), instanceOf(IgniteIndexScan.class)); + assertThat(invalidPlanMsg, join.getRight(), instanceOf(IgniteIndexScan.class)); + } } /** @@ -143,22 +146,24 @@ public void joinComplexToSimpleAff() throws Exception { "from COMPLEX_TBL t1 " + "join SIMPLE_TBL t2 on t1.id1 = t2.id"; - RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin"); + for (String disabledRule : DISABLED_RULES) { + RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", disabledRule); - IgniteMergeJoin join = findFirstNode(phys, byClass(IgniteMergeJoin.class)); + AbstractIgniteJoin join = findFirstNode(phys, byClass(AbstractIgniteJoin.class)); - String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys); + String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys); - assertThat(invalidPlanMsg, join, notNullValue()); - assertThat(invalidPlanMsg, join.distribution().function().affinity(), is(true)); + assertThat(invalidPlanMsg, join, notNullValue()); + assertThat(invalidPlanMsg, join.distribution().function().affinity(), is(true)); - List exchanges = findNodes(phys, node -> node instanceof IgniteExchange - && ((IgniteRel)node).distribution().function().affinity()); + List exchanges = findNodes(phys, node -> node instanceof IgniteExchange + && ((IgniteRel)node).distribution().function().affinity()); - assertThat(invalidPlanMsg, exchanges, hasSize(1)); - assertThat(invalidPlanMsg, exchanges.get(0).getInput(0), instanceOf(IgniteIndexScan.class)); - assertThat(invalidPlanMsg, exchanges.get(0).getInput(0) - .getTable().unwrap(TestTable.class), equalTo(complexTbl)); + assertThat(invalidPlanMsg, exchanges, hasSize(1)); + assertThat(invalidPlanMsg, exchanges.get(0).getInput(0), instanceOf(IgniteIndexScan.class)); + assertThat(invalidPlanMsg, exchanges.get(0).getInput(0) + .getTable().unwrap(TestTable.class), equalTo(complexTbl)); + } } /** @@ -192,14 +197,16 @@ public void joinComplexToComplexAffWithDifferentOrder() throws Exception { "from COMPLEX_TBL_DIRECT t1 " + "join COMPLEX_TBL_INDIRECT t2 on t1.id1 = t2.id1 and t1.id2 = t2.id2"; - RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin"); + for (String disabledRule : DISABLED_RULES) { + RelNode phys = physicalPlan(sql, schema, "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", disabledRule); - IgniteMergeJoin exchange = findFirstNode(phys, node -> node instanceof IgniteExchange - && ((IgniteRel)node).distribution().function().affinity()); + IgniteMergeJoin exchange = findFirstNode(phys, node -> node instanceof IgniteExchange + && ((IgniteRel)node).distribution().function().affinity()); - String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys); + String invalidPlanMsg = "Invalid plan:\n" + RelOptUtil.toString(phys); - assertThat(invalidPlanMsg, exchange, nullValue()); + assertThat(invalidPlanMsg, exchange, nullValue()); + } } /** diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/JoinCommutePlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/JoinCommutePlannerTest.java index 997e4ceaea8b2..8e704c047c535 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/JoinCommutePlannerTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/JoinCommutePlannerTest.java @@ -170,7 +170,8 @@ public void testOuterCommute() throws Exception { String sql = "SELECT COUNT(*) FROM SMALL s RIGHT JOIN HUGE h on h.id = s.id"; - IgniteRel phys = physicalPlan(sql, publicSchema, planLsnr, "MergeJoinConverter", "CorrelatedNestedLoopJoin"); + IgniteRel phys = physicalPlan(sql, publicSchema, planLsnr, "MergeJoinConverter", "CorrelatedNestedLoopJoin", + "HashJoinConverter"); assertNotNull(phys); @@ -182,8 +183,8 @@ public void testOuterCommute() throws Exception { assertEquals(JoinRelType.LEFT, join.getJoinType()); - PlanningContext ctx = plannerCtx(sql, publicSchema, planLsnr, "MergeJoinConverter", - "CorrelatedNestedLoopJoin"); + PlanningContext ctx = plannerCtx(sql, publicSchema, planLsnr, "MergeJoinConverter", "CorrelatedNestedLoopJoin", + "HashJoinConverter"); RelOptPlanner pl = ctx.cluster().getPlanner(); @@ -193,7 +194,8 @@ public void testOuterCommute() throws Exception { assertNotNull(phys); - phys = physicalPlan(sql, publicSchema, planLsnr, "MergeJoinConverter", "CorrelatedNestedLoopJoin", "JoinCommuteRule"); + phys = physicalPlan(sql, publicSchema, planLsnr, "MergeJoinConverter", "CorrelatedNestedLoopJoin", "HashJoinConverter", + "JoinCommuteRule"); join = findFirstNode(phys, byClass(IgniteNestedLoopJoin.class)); @@ -204,7 +206,8 @@ public void testOuterCommute() throws Exception { // no commute assertEquals(JoinRelType.RIGHT, join.getJoinType()); - ctx = plannerCtx(sql, publicSchema, planLsnr, "MergeJoinConverter", "CorrelatedNestedLoopJoin", "JoinCommuteRule"); + ctx = plannerCtx(sql, publicSchema, planLsnr, "MergeJoinConverter", "CorrelatedNestedLoopJoin", "HashJoinConverter", + "JoinCommuteRule"); pl = ctx.cluster().getPlanner(); @@ -272,7 +275,8 @@ public void testInnerCommute() throws Exception { String sql = "SELECT COUNT(*) FROM SMALL s JOIN HUGE h on h.id = s.id"; - IgniteRel phys = physicalPlan(sql, publicSchema, planLsnr, "MergeJoinConverter", "CorrelatedNestedLoopJoin"); + IgniteRel phys = physicalPlan(sql, publicSchema, planLsnr, "MergeJoinConverter", "CorrelatedNestedLoopJoin", + "HashJoinConverter"); assertNotNull(phys); @@ -299,7 +303,8 @@ public void testInnerCommute() throws Exception { assertEquals(JoinRelType.INNER, join.getJoinType()); - PlanningContext ctx = plannerCtx(sql, publicSchema, planLsnr, "MergeJoinConverter", "CorrelatedNestedLoopJoin"); + PlanningContext ctx = plannerCtx(sql, publicSchema, planLsnr, "MergeJoinConverter", "CorrelatedNestedLoopJoin", + "HashJoinConverter"); RelOptPlanner pl = ctx.cluster().getPlanner(); @@ -309,7 +314,8 @@ public void testInnerCommute() throws Exception { assertNotNull(phys); - phys = physicalPlan(sql, publicSchema, planLsnr, "MergeJoinConverter", "CorrelatedNestedLoopJoin", "JoinCommuteRule"); + phys = physicalPlan(sql, publicSchema, planLsnr, "MergeJoinConverter", "CorrelatedNestedLoopJoin", "HashJoinConverter", + "JoinCommuteRule"); join = findFirstNode(phys, byClass(IgniteNestedLoopJoin.class)); proj = findFirstNode(phys, byClass(IgniteProject.class)); @@ -335,7 +341,8 @@ public void testInnerCommute() throws Exception { // no commute assertEquals(JoinRelType.INNER, join.getJoinType()); - ctx = plannerCtx(sql, publicSchema, planLsnr, "MergeJoinConverter", "CorrelatedNestedLoopJoin", "JoinCommuteRule"); + ctx = plannerCtx(sql, publicSchema, planLsnr, "MergeJoinConverter", "CorrelatedNestedLoopJoin", "HashJoinConverter", + "JoinCommuteRule"); pl = ctx.cluster().getPlanner(); diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/MergeJoinPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/MergeJoinPlannerTest.java index 97bb8425f1f8e..c15009fca3372 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/MergeJoinPlannerTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/MergeJoinPlannerTest.java @@ -44,7 +44,8 @@ public class MergeJoinPlannerTest extends AbstractPlannerTest { "NestedLoopJoinConverter", "CorrelatedNestedLoopJoin", "FilterSpoolMergeRule", - "JoinCommuteRule" + "JoinCommuteRule", + "HashJoinConverter" }; /** */ diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/SortedIndexSpoolPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/SortedIndexSpoolPlannerTest.java index ceed9c6d7d76c..63ed1c6a8d6b5 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/SortedIndexSpoolPlannerTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/SortedIndexSpoolPlannerTest.java @@ -90,7 +90,7 @@ public void testNotColocatedEqJoin() throws Exception { IgniteRel phys = physicalPlan( sql, publicSchema, - "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeToHashIndexSpoolRule" + "MergeJoinConverter", "NestedLoopJoinConverter", "HashJoinConverter", "FilterSpoolMergeToHashIndexSpoolRule" ); checkSplitAndSerialization(phys, publicSchema); @@ -157,7 +157,7 @@ public void testPartialIndexForCondition() throws Exception { IgniteRel phys = physicalPlan( sql, publicSchema, - "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeToHashIndexSpoolRule" + "MergeJoinConverter", "NestedLoopJoinConverter", "HashJoinConverter", "FilterSpoolMergeToHashIndexSpoolRule" ); System.out.println("+++ \n" + RelOptUtil.toString(phys)); @@ -252,7 +252,7 @@ public void testRestoreCollation() throws Exception { .and(input(1, isInstanceOf(IgniteSortedIndexSpool.class) .and(spool -> spool.collation().getFieldCollations().get(0).getFieldIndex() == equalIdx) ))), - "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeToHashIndexSpoolRule" + "MergeJoinConverter", "NestedLoopJoinConverter", "HashJoinConverter", "FilterSpoolMergeToHashIndexSpoolRule" ); } } diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/hints/JoinTypeHintPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/hints/JoinTypeHintPlannerTest.java index 66af9f8a106b0..692e3a9f6bd41 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/hints/JoinTypeHintPlannerTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/hints/JoinTypeHintPlannerTest.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerHelper; import org.apache.ignite.internal.processors.query.calcite.rel.AbstractIgniteJoin; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashJoin; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMergeJoin; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteNestedLoopJoin; import org.apache.ignite.internal.processors.query.calcite.rule.logical.IgniteMultiJoinOptimizeRule; @@ -43,9 +44,11 @@ import org.junit.Test; import static org.apache.ignite.internal.processors.query.calcite.hint.HintDefinition.CNL_JOIN; +import static org.apache.ignite.internal.processors.query.calcite.hint.HintDefinition.HASH_JOIN; import static org.apache.ignite.internal.processors.query.calcite.hint.HintDefinition.MERGE_JOIN; import static org.apache.ignite.internal.processors.query.calcite.hint.HintDefinition.NL_JOIN; import static org.apache.ignite.internal.processors.query.calcite.hint.HintDefinition.NO_CNL_JOIN; +import static org.apache.ignite.internal.processors.query.calcite.hint.HintDefinition.NO_HASH_JOIN; import static org.apache.ignite.internal.processors.query.calcite.hint.HintDefinition.NO_MERGE_JOIN; import static org.apache.ignite.internal.processors.query.calcite.hint.HintDefinition.NO_NL_JOIN; @@ -130,12 +133,12 @@ public void testHintsErrors() throws Exception { // Following hint must not override leading. lsnrLog.clearListeners(); - lsnr = LogListener.matches("Skipped hint '" + NL_JOIN.name() + "'") + lsnr = LogListener.matches("Skipped hint '" + HASH_JOIN.name() + "'") .andMatches("This join type is already disabled or forced to use before").build(); lsnrLog.registerListener(lsnr); - physicalPlan("SELECT /*+ " + MERGE_JOIN + "(TBL1)," + NL_JOIN + "(TBL1,TBL2) */ t1.v1, t2.v2 FROM TBL1 " + + physicalPlan("SELECT /*+ " + MERGE_JOIN + "(TBL1)," + HASH_JOIN + "(TBL1,TBL2) */ t1.v1, t2.v2 FROM TBL1 " + "t1 JOIN TBL2 t2 on t1.v3=t2.v3", schema); assertTrue(lsnr.check()); @@ -150,7 +153,7 @@ public void testHintsErrors() throws Exception { physicalPlan("SELECT /*+ " + NL_JOIN + "(TBL1) */ t1.v1, t2.v2 FROM TBL1 " + "t1 JOIN TBL2 t2 on t1.v3=t2.v3 where t2.v1 in " + - "(SELECT /*+ " + MERGE_JOIN + "(TBL1), " + CNL_JOIN + "(TBL1,TBL3) */ t3.v3 from TBL3 t3 JOIN TBL1 t4 " + + "(SELECT /*+ " + HASH_JOIN + "(TBL1), " + CNL_JOIN + "(TBL1,TBL3) */ t3.v3 from TBL3 t3 JOIN TBL1 t4 " + "on t3.v2=t4.v2)", schema, CORE_JOIN_REORDER_RULES); assertTrue(lsnr.check()); @@ -196,9 +199,9 @@ public void testHintsErrors() throws Exception { */ @Test public void testDisableNLJoin() throws Exception { - for (HintDefinition hint : Arrays.asList(NO_NL_JOIN, CNL_JOIN, MERGE_JOIN)) { + for (HintDefinition hint : Arrays.asList(NO_NL_JOIN, CNL_JOIN, MERGE_JOIN, HASH_JOIN)) { doTestDisableJoinTypeWith("TBL5", "TBL4", "INNER", IgniteNestedLoopJoin.class, - NO_NL_JOIN, "MergeJoinConverter"); + NO_NL_JOIN, "MergeJoinConverter", "HashJoinConverter"); doTestDisableJoinTypeWith("TBL3", "TBL1", "LEFT", IgniteNestedLoopJoin.class, NO_NL_JOIN); @@ -220,15 +223,34 @@ public void testDisableNLJoin() throws Exception { @Test public void testDisableMergeJoin() throws Exception { for (HintDefinition hint : Arrays.asList(NO_MERGE_JOIN, NL_JOIN, CNL_JOIN)) { - doTestDisableJoinTypeWith("TBL4", "TBL2", "INNER", IgniteMergeJoin.class, hint); + doTestDisableJoinTypeWith("TBL4", "TBL2", "INNER", IgniteMergeJoin.class, hint, "HashJoinConverter"); - doTestDisableJoinTypeWith("TBL4", "TBL2", "LEFT", IgniteMergeJoin.class, hint); + doTestDisableJoinTypeWith("TBL4", "TBL2", "LEFT", IgniteMergeJoin.class, hint, "HashJoinConverter"); // Correlated nested loop join supports only INNER and LEFT join types. if (hint != CNL_JOIN) { - doTestDisableJoinTypeWith("TBL4", "TBL2", "RIGHT", IgniteMergeJoin.class, hint); + doTestDisableJoinTypeWith("TBL4", "TBL2", "RIGHT", IgniteMergeJoin.class, hint, "HashJoinConverter"); - doTestDisableJoinTypeWith("TBL4", "TBL2", "FULL", IgniteMergeJoin.class, hint); + doTestDisableJoinTypeWith("TBL4", "TBL2", "FULL", IgniteMergeJoin.class, hint, "HashJoinConverter"); + } + } + } + + /** + * Tests hash join is disabled by hints. + */ + @Test + public void testDisableHashJoin() throws Exception { + for (HintDefinition hint : Arrays.asList(NO_HASH_JOIN, MERGE_JOIN, NL_JOIN, CNL_JOIN)) { + doTestDisableJoinTypeWith("TBL4", "TBL2", "INNER", IgniteHashJoin.class, hint); + + doTestDisableJoinTypeWith("TBL4", "TBL2", "LEFT", IgniteHashJoin.class, hint); + + // Correlated nested loop join supports only INNER and LEFT join types. + if (hint != CNL_JOIN) { + doTestDisableJoinTypeWith("TBL4", "TBL2", "RIGHT", IgniteHashJoin.class, hint); + + doTestDisableJoinTypeWith("TBL4", "TBL2", "FULL", IgniteHashJoin.class, hint); } } } @@ -251,6 +273,24 @@ public void testMergeJoinEnabled() throws Exception { MERGE_JOIN, IgniteMergeJoin.class, CORE_JOIN_REORDER_RULES); } + /** + * Tests the hash join is enabled by the hint instead of the other joins. + */ + @Test + public void testHashJoinEnabled() throws Exception { + doTestCertainJoinTypeEnabled("TBL1", "INNER", "TBL2", IgniteCorrelatedNestedLoopJoin.class, + HASH_JOIN, IgniteHashJoin.class); + + doTestCertainJoinTypeEnabled("TBL1", "RIGHT", "TBL2", IgniteNestedLoopJoin.class, + HASH_JOIN, IgniteHashJoin.class); + + doTestCertainJoinTypeEnabled("TBL1", "INNER", "TBL2", IgniteCorrelatedNestedLoopJoin.class, + HASH_JOIN, IgniteHashJoin.class, CORE_JOIN_REORDER_RULES); + + doTestCertainJoinTypeEnabled("TBL1", "RIGHT", "TBL2", IgniteNestedLoopJoin.class, + HASH_JOIN, IgniteHashJoin.class, CORE_JOIN_REORDER_RULES); + } + /** * Tests the nested loop join is enabled by the hint instead of the other joins. */ @@ -259,13 +299,13 @@ public void testNLJoinEnabled() throws Exception { doTestCertainJoinTypeEnabled("TBL2", "INNER", "TBL1", IgniteCorrelatedNestedLoopJoin.class, NL_JOIN, IgniteNestedLoopJoin.class); - doTestCertainJoinTypeEnabled("TBL5", "INNER", "TBL4", IgniteMergeJoin.class, + doTestCertainJoinTypeEnabled("TBL5", "INNER", "TBL4", IgniteHashJoin.class, NL_JOIN, IgniteNestedLoopJoin.class); doTestCertainJoinTypeEnabled("TBL1", "LEFT", "TBL2", IgniteCorrelatedNestedLoopJoin.class, NL_JOIN, IgniteNestedLoopJoin.class, CORE_JOIN_REORDER_RULES); - doTestCertainJoinTypeEnabled("TBL5", "INNER", "TBL4", IgniteMergeJoin.class, + doTestCertainJoinTypeEnabled("TBL5", "INNER", "TBL4", IgniteHashJoin.class, NL_JOIN, IgniteNestedLoopJoin.class, CORE_JOIN_REORDER_RULES); } @@ -291,7 +331,7 @@ public void testCNLJoinEnabled() throws Exception { doTestCertainJoinTypeEnabled("TBL2", "LEFT", "TBL1", IgniteNestedLoopJoin.class, CNL_JOIN, IgniteCorrelatedNestedLoopJoin.class); - doTestCertainJoinTypeEnabled("TBL5", "INNER", "TBL4", IgniteMergeJoin.class, + doTestCertainJoinTypeEnabled("TBL5", "INNER", "TBL4", IgniteHashJoin.class, CNL_JOIN, IgniteCorrelatedNestedLoopJoin.class); // Even CNL join doesn't support RIGHT join, join type and join inputs might be switched by Calcite. @@ -301,7 +341,7 @@ public void testCNLJoinEnabled() throws Exception { doTestCertainJoinTypeEnabled("TBL2", "LEFT", "TBL1", IgniteNestedLoopJoin.class, CNL_JOIN, IgniteCorrelatedNestedLoopJoin.class, CORE_JOIN_REORDER_RULES); - doTestCertainJoinTypeEnabled("TBL5", "INNER", "TBL4", IgniteMergeJoin.class, + doTestCertainJoinTypeEnabled("TBL5", "INNER", "TBL4", IgniteHashJoin.class, CNL_JOIN, IgniteCorrelatedNestedLoopJoin.class, CORE_JOIN_REORDER_RULES); } @@ -397,15 +437,15 @@ public void testTableHints() throws Exception { .and(input(1, nodeOrAnyChild(isTableScan("TBL1")))))), CORE_JOIN_REORDER_RULES); // Table hint has a bigger priority. Leading CNL_JOIN is ignored. - assertPlan(String.format(sqlTpl, "/*+ " + CNL_JOIN + " */", "/*+ " + NL_JOIN + " */", "/*+ " + MERGE_JOIN + " */"), + assertPlan(String.format(sqlTpl, "/*+ " + CNL_JOIN + " */", "/*+ " + NL_JOIN + " */", "/*+ " + HASH_JOIN + " */"), schema, nodeOrAnyChild(isInstanceOf(IgniteNestedLoopJoin.class).and(input(1, isTableScan("TBL3")))) - .and(nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class) + .and(nodeOrAnyChild(isInstanceOf(IgniteHashJoin.class) .and(input(1, nodeOrAnyChild(isTableScan("TBL1")))))), CORE_JOIN_REORDER_RULES); // Leading query hint works only for the second join. - assertPlan(String.format(sqlTpl, "/*+ " + CNL_JOIN + " */", "/*+ " + NL_JOIN + " */", ""), schema, + assertPlan(String.format(sqlTpl, "/*+ " + HASH_JOIN + " */", "/*+ " + NL_JOIN + " */", ""), schema, nodeOrAnyChild(isInstanceOf(IgniteNestedLoopJoin.class).and(input(1, isTableScan("TBL3")))) - .and(nodeOrAnyChild(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class) + .and(nodeOrAnyChild(isInstanceOf(IgniteHashJoin.class) .and(input(1, nodeOrAnyChild(isTableScan("TBL1")))))), CORE_JOIN_REORDER_RULES); // Table hint with wrong table name is ignored. @@ -657,13 +697,13 @@ public void testSeveralDisables() throws Exception { assertPlan(String.format(sqlTpl, "/*+ " + NO_CNL_JOIN + ',' + NO_NL_JOIN + " */"), schema, nodeOrAnyChild(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)).negate() .and(nodeOrAnyChild(isInstanceOf(IgniteNestedLoopJoin.class)).negate()) - .and(nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class).and(hasNestedTableScan("TBL1")) + .and(nodeOrAnyChild(isInstanceOf(IgniteHashJoin.class).and(hasNestedTableScan("TBL1")) .and(hasNestedTableScan("TBL2")))), CORE_JOIN_REORDER_RULES); assertPlan(String.format(sqlTpl, "/*+ " + NO_CNL_JOIN + "(TBL1)," + NO_NL_JOIN + "(TBL2) */"), schema, nodeOrAnyChild(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)).negate() .and(nodeOrAnyChild(isInstanceOf(IgniteNestedLoopJoin.class)).negate()) - .and(nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class).and(hasNestedTableScan("TBL1")) + .and(nodeOrAnyChild(isInstanceOf(IgniteHashJoin.class).and(hasNestedTableScan("TBL1")) .and(hasNestedTableScan("TBL2")))), CORE_JOIN_REORDER_RULES); // Check with forcing in the middle. @@ -688,7 +728,7 @@ schema, nodeOrAnyChild(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)).negat assertPlan(String.format(sqlTpl, "/*+ " + NO_CNL_JOIN + ',' + NO_NL_JOIN + ',' + NO_MERGE_JOIN + " */"), schema, nodeOrAnyChild(isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)).negate() .and(nodeOrAnyChild(isInstanceOf(IgniteNestedLoopJoin.class)).negate()) - .and(nodeOrAnyChild(isInstanceOf(IgniteMergeJoin.class).and(hasNestedTableScan("TBL1")) + .and(nodeOrAnyChild(isInstanceOf(IgniteHashJoin.class).and(hasNestedTableScan("TBL1")) .and(hasNestedTableScan("TBL2")))), CORE_JOIN_REORDER_RULES); // Check many duplicated disables doesn't erase other disables. diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java index bd6f6eb2258c3..7b0cb00b361b9 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java +++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java @@ -23,6 +23,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.rel.HashAggregateExecutionTest; import org.apache.ignite.internal.processors.query.calcite.exec.rel.HashAggregateSingleGroupExecutionTest; import org.apache.ignite.internal.processors.query.calcite.exec.rel.HashIndexSpoolExecutionTest; +import org.apache.ignite.internal.processors.query.calcite.exec.rel.HashJoinExecutionTest; import org.apache.ignite.internal.processors.query.calcite.exec.rel.IntersectExecutionTest; import org.apache.ignite.internal.processors.query.calcite.exec.rel.JoinBuffersExecutionTest; import org.apache.ignite.internal.processors.query.calcite.exec.rel.LimitExecutionTest; @@ -45,6 +46,7 @@ ExecutionTest.class, ContinuousExecutionTest.class, MergeJoinExecutionTest.class, + HashJoinExecutionTest.class, NestedLoopJoinExecutionTest.class, JoinBuffersExecutionTest.class, TableSpoolExecutionTest.class, diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java index eec724a9a64a3..30415c303720e 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java +++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.processors.query.calcite.planner.DataTypesPlannerTest; import org.apache.ignite.internal.processors.query.calcite.planner.HashAggregatePlannerTest; import org.apache.ignite.internal.processors.query.calcite.planner.HashIndexSpoolPlannerTest; +import org.apache.ignite.internal.processors.query.calcite.planner.HashJoinPlannerTest; import org.apache.ignite.internal.processors.query.calcite.planner.IndexRebuildPlannerTest; import org.apache.ignite.internal.processors.query.calcite.planner.IndexSearchBoundsPlannerTest; import org.apache.ignite.internal.processors.query.calcite.planner.InlineIndexScanPlannerTest; @@ -76,6 +77,7 @@ JoinCommutePlannerTest.class, LimitOffsetPlannerTest.class, MergeJoinPlannerTest.class, + HashJoinPlannerTest.class, StatisticsPlannerTest.class, CorrelatedSubqueryPlannerTest.class, JoinWithUsingPlannerTest.class, diff --git a/modules/calcite/src/test/sql/sqlite/join/join1.test b/modules/calcite/src/test/sql/sqlite/join/join1.test index d1f33fa306274..09b41b0b30ae3 100644 --- a/modules/calcite/src/test/sql/sqlite/join/join1.test +++ b/modules/calcite/src/test/sql/sqlite/join/join1.test @@ -24,7 +24,7 @@ statement ok INSERT INTO t2 VALUES(3,4,5); query III rowsort -SELECT /*+ CNL_JOIN */ t2.* FROM t2 NATURAL JOIN t1; +SELECT /*+ NL_JOIN */ t2.* FROM t2 NATURAL JOIN t1; ---- 2 3 4 3 4 5 @@ -41,6 +41,12 @@ SELECT /*+ MERGE_JOIN */ t2.* FROM t2 NATURAL JOIN t1; 2 3 4 3 4 5 +query III rowsort +SELECT /*+ HASH_JOIN */ t2.* FROM t2 NATURAL JOIN t1; +---- +2 3 4 +3 4 5 + query III rowsort SELECT t1.* FROM t2 NATURAL JOIN t1; ----