Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.

Commit d92af48

Browse files
committed
TAJO-1554 Implement hash-theta join
1 parent 4f5f5d0 commit d92af48

49 files changed

Lines changed: 1391 additions & 298 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ public boolean containsAll(Collection<Column> columns) {
313313

314314
public boolean containsAny(Collection<Column> columns) {
315315
for (Column column : columns) {
316-
if (fields.contains(column)) {
316+
if (contains(column)) {
317317
return true;
318318
}
319319
}

tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import java.net.InetAddress;
2929
import java.util.Arrays;
3030

31-
public class VTuple implements Tuple, Cloneable {
31+
public class VTuple implements Tuple, Cloneable, Comparable<Tuple> {
3232
@Expose public Datum [] values;
3333
@Expose private long offset;
3434

@@ -237,4 +237,21 @@ public static String toDisplayString(Datum [] values) {
237237
str.append(')');
238238
return str.toString();
239239
}
240+
241+
@Override
242+
public int compareTo(Tuple other) {
243+
for (int i = 0; i < size(); i++) {
244+
if (isNull(i) && other.isNull(i)) {
245+
continue;
246+
}
247+
if (isNull(i) || other.isNull(i)) {
248+
return isNull(i) ? 1 : -1;
249+
}
250+
int compare = get(i).compareTo(other.get(i));
251+
if (compare != 0) {
252+
return compare;
253+
}
254+
}
255+
return 0;
256+
}
240257
}

tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -281,9 +281,24 @@ public boolean checkIfInMemoryInnerJoinIsPossible(TaskAttemptContext context, Lo
281281
return inMemoryInnerJoinFlag;
282282
}
283283

284-
public PhysicalExec createJoinPlan(TaskAttemptContext context, JoinNode joinNode, PhysicalExec leftExec,
285-
PhysicalExec rightExec) throws IOException {
284+
public CommonJoinExec createJoinPlan(TaskAttemptContext context, JoinNode joinNode, PhysicalExec leftExec,
285+
PhysicalExec rightExec) throws IOException {
286+
CommonJoinExec joinExec;
287+
try {
288+
joinExec = createJoinExec(context, joinNode, leftExec, rightExec);
289+
} catch (Exception e) {
290+
LOG.warn("Failed to make join exec, failing back to CROSS join", e);
291+
return createCrossJoinPlan(context, joinNode, leftExec, rightExec);
292+
}
293+
if (joinExec.isThetaJoin() && !joinExec.isHashJoin()) {
294+
LOG.warn("Generic theta join is not supported, yet");
295+
return createCrossJoinPlan(context, joinNode, leftExec, rightExec);
296+
}
297+
return joinExec;
298+
}
286299

300+
private CommonJoinExec createJoinExec(TaskAttemptContext context, JoinNode joinNode, PhysicalExec leftExec,
301+
PhysicalExec rightExec) throws IOException {
287302
switch (joinNode.getJoinType()) {
288303
case CROSS:
289304
return createCrossJoinPlan(context, joinNode, leftExec, rightExec);
@@ -317,7 +332,7 @@ public PhysicalExec createJoinPlan(TaskAttemptContext context, JoinNode joinNode
317332
}
318333
}
319334

320-
private PhysicalExec createCrossJoinPlan(TaskAttemptContext context, JoinNode plan,
335+
private CommonJoinExec createCrossJoinPlan(TaskAttemptContext context, JoinNode plan,
321336
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
322337
Enforcer enforcer = context.getEnforcer();
323338
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
@@ -343,7 +358,7 @@ private PhysicalExec createCrossJoinPlan(TaskAttemptContext context, JoinNode pl
343358
}
344359
}
345360

346-
private PhysicalExec createInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
361+
private CommonJoinExec createInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
347362
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
348363
Enforcer enforcer = context.getEnforcer();
349364
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
@@ -416,7 +431,7 @@ private PhysicalExec createInnerJoinPlan(TaskAttemptContext context, JoinNode pl
416431
return new PhysicalExec [] {smaller, larger};
417432
}
418433

419-
private PhysicalExec createBestInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
434+
private CommonJoinExec createBestInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
420435
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
421436
boolean inMemoryHashJoin = false;
422437
if (checkIfInMemoryInnerJoinIsPossible(context, plan.getLeftChild(), true)
@@ -434,7 +449,7 @@ private PhysicalExec createBestInnerJoinPlan(TaskAttemptContext context, JoinNod
434449
}
435450
}
436451

437-
private MergeJoinExec createMergeInnerJoin(TaskAttemptContext context, JoinNode plan,
452+
private CommonJoinExec createMergeInnerJoin(TaskAttemptContext context, JoinNode plan,
438453
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
439454
SortSpec[][] sortSpecs = PlannerUtil.getSortKeysFromJoinQual(
440455
plan.getJoinQual(), leftExec.getSchema(), rightExec.getSchema());
@@ -455,7 +470,7 @@ private MergeJoinExec createMergeInnerJoin(TaskAttemptContext context, JoinNode
455470
return new MergeJoinExec(context, plan, outerSort, innerSort, sortSpecs[0], sortSpecs[1]);
456471
}
457472

458-
private PhysicalExec createLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
473+
private CommonJoinExec createLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
459474
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
460475
Enforcer enforcer = context.getEnforcer();
461476
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
@@ -479,7 +494,7 @@ private PhysicalExec createLeftOuterJoinPlan(TaskAttemptContext context, JoinNod
479494
}
480495
}
481496

482-
private PhysicalExec createBestLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
497+
private CommonJoinExec createBestLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
483498
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
484499
String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
485500
long rightTableVolume = estimateSizeRecursive(context, rightLineage);
@@ -505,7 +520,7 @@ private PhysicalExec createBestLeftOuterJoinPlan(TaskAttemptContext context, Joi
505520
}
506521
}
507522

508-
private PhysicalExec createBestRightJoinPlan(TaskAttemptContext context, JoinNode plan,
523+
private CommonJoinExec createBestRightJoinPlan(TaskAttemptContext context, JoinNode plan,
509524
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
510525
//if the left operand is small enough => implement it as a left outer hash join with exchanged operators (note:
511526
// blocking, but merge join is blocking as well)
@@ -529,7 +544,7 @@ private PhysicalExec createBestRightJoinPlan(TaskAttemptContext context, JoinNod
529544
}
530545
}
531546

532-
private PhysicalExec createRightOuterMergeJoinPlan(TaskAttemptContext context, JoinNode plan,
547+
private CommonJoinExec createRightOuterMergeJoinPlan(TaskAttemptContext context, JoinNode plan,
533548
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
534549
//the left operand is too large, so opt for merge join implementation
535550
LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Merge Join].");
@@ -551,7 +566,7 @@ private PhysicalExec createRightOuterMergeJoinPlan(TaskAttemptContext context, J
551566
return new RightOuterMergeJoinExec(context, plan, outerSort2, innerSort2, sortSpecs2[0], sortSpecs2[1]);
552567
}
553568

554-
private PhysicalExec createRightOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
569+
private CommonJoinExec createRightOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
555570
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
556571
Enforcer enforcer = context.getEnforcer();
557572
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
@@ -573,7 +588,7 @@ private PhysicalExec createRightOuterJoinPlan(TaskAttemptContext context, JoinNo
573588
}
574589
}
575590

576-
private PhysicalExec createFullOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
591+
private CommonJoinExec createFullOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
577592
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
578593
Enforcer enforcer = context.getEnforcer();
579594
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
@@ -596,7 +611,7 @@ private PhysicalExec createFullOuterJoinPlan(TaskAttemptContext context, JoinNod
596611
}
597612
}
598613

599-
private HashFullOuterJoinExec createFullOuterHashJoinPlan(TaskAttemptContext context, JoinNode plan,
614+
private CommonJoinExec createFullOuterHashJoinPlan(TaskAttemptContext context, JoinNode plan,
600615
PhysicalExec leftExec, PhysicalExec rightExec)
601616
throws IOException {
602617
String [] leftLineage = PlannerUtil.getRelationLineage(plan.getLeftChild());
@@ -642,7 +657,7 @@ private MergeFullOuterJoinExec createFullOuterMergeJoinPlan(TaskAttemptContext c
642657
return new MergeFullOuterJoinExec(context, plan, outerSort3, innerSort3, sortSpecs3[0], sortSpecs3[1]);
643658
}
644659

645-
private PhysicalExec createBestFullOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
660+
private CommonJoinExec createBestFullOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
646661
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
647662
String [] leftLineage = PlannerUtil.getRelationLineage(plan.getLeftChild());
648663
String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
@@ -659,7 +674,7 @@ private PhysicalExec createBestFullOuterJoinPlan(TaskAttemptContext context, Joi
659674
/**
660675
* Left semi join means that the left side is the IN side table, and the right side is the FROM side table.
661676
*/
662-
private PhysicalExec createLeftSemiJoinPlan(TaskAttemptContext context, JoinNode plan,
677+
private CommonJoinExec createLeftSemiJoinPlan(TaskAttemptContext context, JoinNode plan,
663678
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
664679
Enforcer enforcer = context.getEnforcer();
665680
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
@@ -684,7 +699,7 @@ private PhysicalExec createLeftSemiJoinPlan(TaskAttemptContext context, JoinNode
684699
/**
685700
* Left semi join means that the left side is the FROM side table, and the right side is the IN side table.
686701
*/
687-
private PhysicalExec createRightSemiJoinPlan(TaskAttemptContext context, JoinNode plan,
702+
private CommonJoinExec createRightSemiJoinPlan(TaskAttemptContext context, JoinNode plan,
688703
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
689704
Enforcer enforcer = context.getEnforcer();
690705
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
@@ -709,7 +724,7 @@ private PhysicalExec createRightSemiJoinPlan(TaskAttemptContext context, JoinNod
709724
/**
710725
* Left semi join means that the left side is the FROM side table, and the right side is the IN side table.
711726
*/
712-
private PhysicalExec createLeftAntiJoinPlan(TaskAttemptContext context, JoinNode plan,
727+
private CommonJoinExec createLeftAntiJoinPlan(TaskAttemptContext context, JoinNode plan,
713728
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
714729
Enforcer enforcer = context.getEnforcer();
715730
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
@@ -734,7 +749,7 @@ private PhysicalExec createLeftAntiJoinPlan(TaskAttemptContext context, JoinNode
734749
/**
735750
* Left semi join means that the left side is the FROM side table, and the right side is the IN side table.
736751
*/
737-
private PhysicalExec createRightAntiJoinPlan(TaskAttemptContext context, JoinNode plan,
752+
private CommonJoinExec createRightAntiJoinPlan(TaskAttemptContext context, JoinNode plan,
738753
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
739754
Enforcer enforcer = context.getEnforcer();
740755
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);

0 commit comments

Comments
 (0)