Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.

Commit a7ad5df

Browse files
committed
TAJO-1547 Implement hash-cross join
1 parent 67a3117 commit a7ad5df

7 files changed

Lines changed: 133 additions & 52 deletions

File tree

tajo-common/src/main/java/org/apache/tajo/SessionVars.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@ public enum SessionVars implements ConfigKey {
116116
"limited size for hash inner join (mb)", DEFAULT, Long.class, Validators.min("0")),
117117
OUTER_HASH_JOIN_SIZE_LIMIT(ConfVars.$EXECUTOR_OUTER_HASH_JOIN_SIZE_THRESHOLD, "limited size for hash outer join (mb)",
118118
DEFAULT, Long.class, Validators.min("0")),
119+
CROSS_HASH_JOIN_SIZE_LIMIT(ConfVars.$EXECUTOR_CROSS_HASH_JOIN_SIZE_THRESHOLD, "limited size for hash cross join (mb)",
120+
DEFAULT, Long.class, Validators.min("0")),
119121
HASH_GROUPBY_SIZE_LIMIT(ConfVars.$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD, "limited size for hash groupby (mb)",
120122
DEFAULT, Long.class, Validators.min("0")),
121123
MAX_OUTPUT_FILE_SIZE(ConfVars.$MAX_OUTPUT_FILE_SIZE, "Maximum per-output file size (mb). 0 means infinite.", DEFAULT,

tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,8 @@ public static enum ConfVars implements ConfigKey {
328328
(long)256 * 1048576),
329329
$EXECUTOR_OUTER_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.outer.in-memory-hash-threshold-bytes",
330330
(long)256 * 1048576),
331+
$EXECUTOR_CROSS_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.cross.in-memory-hash-threshold-bytes",
332+
(long)256 * 1048576),
331333
$EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD("tajo.executor.groupby.in-memory-hash-threshold-bytes",
332334
(long)256 * 1048576),
333335
$MAX_OUTPUT_FILE_SIZE("tajo.query.max-outfile-size-mb", 0), // zero means infinite

tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,6 @@ public interface PhysicalPlanner {
3333
public PhysicalExec createPlan(TaskAttemptContext context,
3434
LogicalNode logicalPlan)
3535
throws InternalException;
36+
37+
enum INPUT { LEFT, RIGHT }
3638
}

tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java

Lines changed: 44 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -256,28 +256,38 @@ public long estimateSizeRecursive(TaskAttemptContext ctx, String [] tableIds) th
256256
return size;
257257
}
258258

259-
@VisibleForTesting
260-
public boolean checkIfInMemoryInnerJoinIsPossible(TaskAttemptContext context, LogicalNode node, boolean left)
259+
public boolean isInnerJoinHashApplicable(TaskAttemptContext context, LogicalNode node, INPUT input)
261260
throws IOException {
262-
String [] lineage = PlannerUtil.getRelationLineage(node);
263-
long volume = estimateSizeRecursive(context, lineage);
264-
boolean inMemoryInnerJoinFlag = false;
261+
return isHashApplicable(context, node, SessionVars.INNER_HASH_JOIN_SIZE_LIMIT, input);
262+
}
265263

266-
QueryContext queryContext = context.getQueryContext();
264+
public boolean isOuterJoinHashApplicable(TaskAttemptContext context, LogicalNode node, INPUT input)
265+
throws IOException {
266+
return isHashApplicable(context, node, SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT, input);
267+
}
267268

268-
if (queryContext.containsKey(SessionVars.INNER_HASH_JOIN_SIZE_LIMIT)) {
269-
inMemoryInnerJoinFlag = volume <= context.getQueryContext().getLong(SessionVars.INNER_HASH_JOIN_SIZE_LIMIT);
270-
} else {
271-
inMemoryInnerJoinFlag = volume <= context.getQueryContext().getLong(SessionVars.HASH_JOIN_SIZE_LIMIT);
272-
}
269+
public boolean isCrossJoinHashApplicable(TaskAttemptContext context, LogicalNode node, INPUT input)
270+
throws IOException {
271+
return isHashApplicable(context, node, SessionVars.CROSS_HASH_JOIN_SIZE_LIMIT, input);
272+
}
273+
274+
private boolean isHashApplicable(TaskAttemptContext context, LogicalNode node,
275+
SessionVars sessionVar, INPUT input) throws IOException {
276+
String [] lineage = PlannerUtil.getRelationLineage(node);
277+
long volume = estimateSizeRecursive(context, lineage);
273278

274-
LOG.info(String.format("[%s] the volume of %s relations (%s) is %s and is %sfit to main maemory.",
279+
boolean applicable = volume <= getThreshold(context.getQueryContext(), sessionVar);
280+
LOG.info(String.format("[%s] the volume of %s relations (%s) is %s and is %sfit to main memory.",
275281
context.getTaskId().toString(),
276-
(left ? "Left" : "Right"),
282+
input.name().toLowerCase(),
277283
TUtil.arrayToString(lineage),
278284
FileUtil.humanReadableByteCount(volume, false),
279-
(inMemoryInnerJoinFlag ? "" : "not ")));
280-
return inMemoryInnerJoinFlag;
285+
(applicable ? "" : "not ")));
286+
return applicable;
287+
}
288+
289+
private long getThreshold(QueryContext context, SessionVars sessionVar) {
290+
return context.getLong(sessionVar, context.getLong(SessionVars.HASH_JOIN_SIZE_LIMIT));
281291
}
282292

283293
public PhysicalExec createJoinPlan(TaskAttemptContext context, JoinNode joinNode, PhysicalExec leftExec,
@@ -325,6 +335,9 @@ private PhysicalExec createCrossJoinPlan(TaskAttemptContext context, JoinNode pl
325335
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
326336

327337
switch (algorithm) {
338+
case IN_MEMORY_HASH_JOIN:
339+
LOG.info("Join (" + plan.getPID() +") chooses [Hash Join]");
340+
return new HashCrossJoinExec(context, plan, leftExec, rightExec);
328341
case NESTED_LOOP_JOIN:
329342
LOG.info("Join (" + plan.getPID() +") chooses [Nested Loop Join]");
330343
return new NLJoinExec(context, plan, leftExec, rightExec);
@@ -336,10 +349,19 @@ private PhysicalExec createCrossJoinPlan(TaskAttemptContext context, JoinNode pl
336349
LOG.error("Invalid Cross Join Algorithm Enforcer: " + algorithm.name());
337350
return new BNLJoinExec(context, plan, leftExec, rightExec);
338351
}
352+
}
339353

340-
} else {
341-
return new BNLJoinExec(context, plan, leftExec, rightExec);
354+
boolean inMemoryHashJoin =
355+
isCrossJoinHashApplicable(context, plan.getLeftChild(), INPUT.LEFT) ||
356+
isCrossJoinHashApplicable(context, plan.getRightChild(), INPUT.RIGHT);
357+
358+
if (inMemoryHashJoin) {
359+
LOG.info("Join (" + plan.getPID() +") chooses [Hash Join]");
360+
// returns two PhysicalExec. smaller one is 0, and larger one is 1.
361+
PhysicalExec [] orderedChilds = switchJoinSidesIfNecessary(context, plan, leftExec, rightExec);
362+
return new HashCrossJoinExec(context, plan, orderedChilds[1], orderedChilds[0]);
342363
}
364+
return new BNLJoinExec(context, plan, leftExec, rightExec);
343365
}
344366

345367
private PhysicalExec createInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
@@ -417,11 +439,9 @@ private PhysicalExec createInnerJoinPlan(TaskAttemptContext context, JoinNode pl
417439

418440
private PhysicalExec createBestInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
419441
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
420-
boolean inMemoryHashJoin = false;
421-
if (checkIfInMemoryInnerJoinIsPossible(context, plan.getLeftChild(), true)
422-
|| checkIfInMemoryInnerJoinIsPossible(context, plan.getRightChild(), false)) {
423-
inMemoryHashJoin = true;
424-
}
442+
boolean inMemoryHashJoin =
443+
isInnerJoinHashApplicable(context, plan.getLeftChild(), INPUT.LEFT) ||
444+
isInnerJoinHashApplicable(context, plan.getRightChild(), INPUT.RIGHT);
425445

426446
if (inMemoryHashJoin) {
427447
LOG.info("Join (" + plan.getPID() +") chooses [In-memory Hash Join]");
@@ -480,19 +500,7 @@ private PhysicalExec createLeftOuterJoinPlan(TaskAttemptContext context, JoinNod
480500

481501
private PhysicalExec createBestLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
482502
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
483-
String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
484-
long rightTableVolume = estimateSizeRecursive(context, rightLineage);
485-
boolean hashJoin;
486-
487-
QueryContext queryContext = context.getQueryContext();
488-
489-
if (queryContext.containsKey(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT)) {
490-
hashJoin = rightTableVolume < queryContext.getLong(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT);
491-
} else {
492-
hashJoin = rightTableVolume < queryContext.getLong(SessionVars.HASH_JOIN_SIZE_LIMIT);
493-
}
494-
495-
if (hashJoin) {
503+
if (isOuterJoinHashApplicable(context, plan, INPUT.RIGHT)) {
496504
// we can implement left outer join using hash join, using the right operand as the build relation
497505
LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join].");
498506
return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
@@ -508,19 +516,7 @@ private PhysicalExec createBestRightJoinPlan(TaskAttemptContext context, JoinNod
508516
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
509517
//if the left operand is small enough => implement it as a left outer hash join with exchanged operators (note:
510518
// blocking, but merge join is blocking as well)
511-
String [] outerLineage4 = PlannerUtil.getRelationLineage(plan.getLeftChild());
512-
long leftTableVolume = estimateSizeRecursive(context, outerLineage4);
513-
boolean hashJoin;
514-
515-
QueryContext queryContext = context.getQueryContext();
516-
517-
if (queryContext.containsKey(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT)) {
518-
hashJoin = leftTableVolume < queryContext.getLong(SessionVars.OUTER_HASH_JOIN_SIZE_LIMIT);
519-
} else {
520-
hashJoin = leftTableVolume < queryContext.getLong(SessionVars.HASH_JOIN_SIZE_LIMIT);
521-
}
522-
523-
if (hashJoin){
519+
if (isOuterJoinHashApplicable(context, plan, INPUT.LEFT)){
524520
LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join].");
525521
return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
526522
} else {
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.tajo.engine.planner.physical;
20+
21+
import org.apache.tajo.plan.logical.JoinNode;
22+
import org.apache.tajo.storage.Tuple;
23+
import org.apache.tajo.storage.VTuple;
24+
import org.apache.tajo.worker.TaskAttemptContext;
25+
26+
import java.io.IOException;
27+
import java.util.Iterator;
28+
import java.util.List;
29+
30+
public class HashCrossJoinExec extends HashJoinExec {
31+
32+
private Iterator<List<Tuple>> outIterator;
33+
34+
public HashCrossJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftExec,
35+
PhysicalExec rightExec) {
36+
super(context, plan, leftExec, rightExec);
37+
}
38+
39+
@Override
40+
public Tuple next() throws IOException {
41+
if (first) {
42+
loadRightToHashTable();
43+
}
44+
if (tupleSlots.isEmpty()) {
45+
return null;
46+
}
47+
48+
while (!context.isStopped() && !finished) {
49+
if (shouldGetLeftTuple) { // initially, it is true.
50+
// getting new outer
51+
leftTuple = leftChild.next(); // it comes from a disk
52+
if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
53+
finished = true;
54+
return null;
55+
}
56+
outIterator = tupleSlots.values().iterator();
57+
iterator = outIterator.next().iterator();
58+
shouldGetLeftTuple = false;
59+
}
60+
61+
// getting a next right tuple on in-memory hash table.
62+
while (!iterator.hasNext() && outIterator.hasNext()) {
63+
iterator = outIterator.next().iterator();
64+
}
65+
if (!iterator.hasNext()) {
66+
shouldGetLeftTuple = true;
67+
continue;
68+
}
69+
frameTuple.set(leftTuple, iterator.next());
70+
projector.eval(frameTuple, outTuple);
71+
return new VTuple(outTuple);
72+
}
73+
74+
return null;
75+
}
76+
}

tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252
import java.io.IOException;
5353

5454
import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
55+
import static org.apache.tajo.engine.planner.PhysicalPlanner.INPUT.LEFT;
56+
import static org.apache.tajo.engine.planner.PhysicalPlanner.INPUT.RIGHT;
5557
import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
5658
import static org.junit.Assert.*;
5759

@@ -269,11 +271,11 @@ private static boolean assertCheckInnerJoinRelatedFunctions(TaskAttemptContext c
269271
}
270272

271273
if (leftSmaller) {
272-
assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getLeftChild(), true));
273-
assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getRightChild(), false));
274+
assertTrue(phyPlanner.isInnerJoinHashApplicable(ctx, joinNode.getLeftChild(), LEFT));
275+
assertFalse(phyPlanner.isInnerJoinHashApplicable(ctx, joinNode.getRightChild(), RIGHT));
274276
} else {
275-
assertFalse(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getLeftChild(), true));
276-
assertTrue(phyPlanner.checkIfInMemoryInnerJoinIsPossible(ctx, joinNode.getRightChild(), false));
277+
assertFalse(phyPlanner.isInnerJoinHashApplicable(ctx, joinNode.getLeftChild(), LEFT));
278+
assertTrue(phyPlanner.isInnerJoinHashApplicable(ctx, joinNode.getRightChild(), RIGHT));
277279
}
278280

279281
return leftSmaller;

tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ Available Session Variables:
3030
\set HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash join (mb)
3131
\set INNER_HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash inner join (mb)
3232
\set OUTER_HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash outer join (mb)
33+
\set CROSS_HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash cross join (mb)
3334
\set HASH_GROUPBY_SIZE_LIMIT [long value] - limited size for hash groupby (mb)
3435
\set MAX_OUTPUT_FILE_SIZE [int value] - Maximum per-output file size (mb). 0 means infinite.
3536
\set NULL_CHAR [text value] - null char of text file output

0 commit comments

Comments
 (0)