diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java index 9b506ef3d8..09fc65906a 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java @@ -215,6 +215,11 @@ protected static boolean isValidJoinDefinition(SQLJoinDefinition sqlJoinDefiniti validationProblems); } + // Validate join stages for join on keys + if (joinDefinition.getCondition().getOp() == JoinCondition.Op.KEY_EQUALITY) { + BigQuerySQLEngineUtils.validateJoinOnKeyStages(joinDefinition, validationProblems); + } + if (!validationProblems.isEmpty()) { LOG.warn("Join operation for stage '{}' could not be executed in BigQuery. Issues found: {}.", sqlJoinDefinition.getDatasetName(), diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/util/BigQuerySQLEngineUtils.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/util/BigQuerySQLEngineUtils.java index 6de68432d4..194854f086 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/util/BigQuerySQLEngineUtils.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/util/BigQuerySQLEngineUtils.java @@ -25,6 +25,7 @@ import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.etl.api.engine.sql.SQLEngineException; import io.cdap.cdap.etl.api.join.JoinCondition; +import io.cdap.cdap.etl.api.join.JoinDefinition; import io.cdap.cdap.etl.api.join.JoinStage; import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils; import io.cdap.plugin.gcp.bigquery.sqlengine.BigQuerySQLEngineConfig; @@ -214,6 +215,35 @@ public static void validateOnExpressionJoinCondition(JoinCondition.OnExpression } } + /** + * Validates stages for a Join on Key operation + * + * TODO: Update logic once BQ SQL engine joins support multiple outer join tables + * + * @param joinDefinition Join Definition to validate + * @param validationProblems List of validation problems to use to append messages + */ + public static void validateJoinOnKeyStages(JoinDefinition joinDefinition, List validationProblems) { + // 2 stages are not an issue + if (joinDefinition.getStages().size() < 3) { + return; + } + + // For 3 or more stages, we only support inner joins. + boolean isInnerJoin = true; + + // If any of the stages is not required, this is an outer join + for (JoinStage stage : joinDefinition.getStages()) { + isInnerJoin &= stage.isRequired(); + } + + if (!isInnerJoin) { + validationProblems.add( + String.format("Only 2 input stages are supported for outer joins, %d stages supplied.", + joinDefinition.getStages().size())); + } + } + /** * Ensure the Stage name is valid for execution in BQ pushdown. *

@@ -230,6 +260,7 @@ public static boolean isValidIdentifier(String identifier) { /** * Get tags for BQ Pushdown tags + * * @param operation the current operation that is being executed * @return Map containing tags for a job. */ diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngineTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngineTest.java index 1f340edf53..e982104e98 100644 --- a/src/test/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngineTest.java +++ b/src/test/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngineTest.java @@ -75,6 +75,49 @@ public void testIsValidJoinDefinitionOnKey() { Schema.Field.of("shipment_id", Schema.of(Schema.Type.INT)), Schema.Field.of("zip", Schema.nullableOf(Schema.of(Schema.Type.INT)))); + Schema outputSchema = + Schema.recordOf("Join", + Schema.Field.of("shipment_id", Schema.of(Schema.Type.INT)), + Schema.Field.of("from_zip", Schema.nullableOf(Schema.of(Schema.Type.INT)))); + + // First join is a right join, second join is a left join + JoinStage shipments = JoinStage.builder("Shipments", shipmentSchema).setRequired(true).build(); + JoinStage fromAddresses = JoinStage.builder("FromAddress", fromAddressSchema).setRequired(true).build(); + + // null safe + JoinCondition condition = JoinCondition.onKeys() + .addKey(new JoinKey("Shipments", Arrays.asList("id"))) + .addKey(new JoinKey("FromAddress", Arrays.asList("shipment_id"))) + .setNullSafe(false) + .build(); + + JoinDefinition joinDefinition = JoinDefinition.builder() + .select(new JoinField("Shipments", "id", "shipment_id"), + new JoinField("FromAddress", "zip", "from_zip")) + .from(shipments, fromAddresses) + .on(condition) + .setOutputSchemaName("Join") + .setOutputSchema(outputSchema) + .build(); + + SQLJoinDefinition sqlJoinDefinition = new SQLJoinDefinition("Join", joinDefinition); + + Assert.assertTrue(BigQuerySQLEngine.isValidJoinDefinition(sqlJoinDefinition)); + verify(logger, times(0)).warn(anyString(), anyString(), anyString()); + } + + @Test + public void testInnerJoinFor3StagesIsSupported() { + Schema shipmentSchema = + Schema.recordOf("Shipments", + Schema.Field.of("id", Schema.of(Schema.Type.INT))); + + Schema fromAddressSchema = + Schema.recordOf("FromAddress", + Schema.Field.of("id", Schema.of(Schema.Type.INT)), + Schema.Field.of("shipment_id", Schema.of(Schema.Type.INT)), + Schema.Field.of("zip", Schema.nullableOf(Schema.of(Schema.Type.INT)))); + Schema toAddressSchema = Schema.recordOf("ToAddress", Schema.Field.of("id", Schema.of(Schema.Type.INT)), @@ -116,6 +159,74 @@ public void testIsValidJoinDefinitionOnKey() { verify(logger, times(0)).warn(anyString(), anyString(), anyString()); } + @Test + public void testOuterJoinFor3StagesIsNotSupported() { + ArgumentCaptor messageTemplateCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor stageNameCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor issuesCaptor = ArgumentCaptor.forClass(String.class); + + Schema shipmentSchema = + Schema.recordOf("Shipments", + Schema.Field.of("id", Schema.of(Schema.Type.INT))); + + Schema fromAddressSchema = + Schema.recordOf("FromAddress", + Schema.Field.of("id", Schema.of(Schema.Type.INT)), + Schema.Field.of("shipment_id", Schema.of(Schema.Type.INT)), + Schema.Field.of("zip", Schema.nullableOf(Schema.of(Schema.Type.INT)))); + + Schema toAddressSchema = + Schema.recordOf("ToAddress", + Schema.Field.of("id", Schema.of(Schema.Type.INT)), + Schema.Field.of("shipment_id", Schema.of(Schema.Type.INT)), + Schema.Field.of("zip", Schema.nullableOf(Schema.of(Schema.Type.INT)))); + + Schema outputSchema = + Schema.recordOf("Join", + Schema.Field.of("shipment_id", Schema.of(Schema.Type.INT)), + Schema.Field.of("from_zip", Schema.nullableOf(Schema.of(Schema.Type.INT))), + Schema.Field.of("to_zip", Schema.nullableOf(Schema.of(Schema.Type.INT)))); + + // First join is a right join, second join is a left join + JoinStage shipments = JoinStage.builder("Shipments", shipmentSchema).setRequired(true).build(); + JoinStage fromAddresses = JoinStage.builder("FromAddress", fromAddressSchema).setRequired(true).build(); + JoinStage toAddresses = JoinStage.builder("ToAddress", toAddressSchema).setRequired(false).build(); + + // null safe + JoinCondition condition = JoinCondition.onKeys() + .addKey(new JoinKey("Shipments", Arrays.asList("id"))) + .addKey(new JoinKey("FromAddress", Arrays.asList("shipment_id"))) + .addKey(new JoinKey("ToAddress", Arrays.asList("shipment_id"))) + .setNullSafe(false) + .build(); + + JoinDefinition joinDefinition = JoinDefinition.builder() + .select(new JoinField("Shipments", "id", "shipment_id"), + new JoinField("FromAddress", "zip", "from_zip"), + new JoinField("ToAddress", "zip", "to_zip")) + .from(shipments, fromAddresses, toAddresses) + .on(condition) + .setOutputSchemaName("Join") + .setOutputSchema(outputSchema) + .build(); + + SQLJoinDefinition sqlJoinDefinition = new SQLJoinDefinition("Join", joinDefinition); + + Assert.assertFalse(BigQuerySQLEngine.isValidJoinDefinition(sqlJoinDefinition)); + verify(logger).warn(messageTemplateCaptor.capture(), stageNameCaptor.capture(), issuesCaptor.capture()); + + String messageTemplate = messageTemplateCaptor.getValue(); + Assert.assertTrue(messageTemplate.contains( + "Join operation for stage '{}' could not be executed in BigQuery. Issues found:")); + + String stageName = stageNameCaptor.getValue(); + Assert.assertEquals("Join", stageName); + + String issues = issuesCaptor.getValue(); + Assert.assertTrue(issues.contains( + "Only 2 input stages are supported for outer joins, 3 stages supplied.")); + } + @Test public void testIsValidJoinDefinitionOnKeyWithErrors() { ArgumentCaptor messageTemplateCaptor = ArgumentCaptor.forClass(String.class);