diff --git a/pom.xml b/pom.xml
index 2ac4778046..97bf2b17f6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -72,7 +72,7 @@
1.8.2
hadoop2-1.0.0
1.4
- 6.7.0
+ 6.8.0-SNAPSHOT
2.10.0-SNAPSHOT
3.2.6
0.3.1
diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.java
index f9a5800bea..064b0a3141 100644
--- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.java
+++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.java
@@ -161,6 +161,12 @@ void initSQLEngineOutput(BatchSinkContext context,
String tableName,
@Nullable Schema tableSchema,
FailureCollector collector) {
+ // Sink Pushdown is not supported if the sink schema is not defined.
+ if (tableSchema == null) {
+ LOG.debug("BigQuery SQL Engine Output was not initialized. Schema was empty.");
+ return;
+ }
+
List fields = BigQuerySinkUtils.getBigQueryTableFields(bigQuery, tableName, tableSchema,
getConfig().isAllowSchemaRelaxation(),
config.getDatasetProject(), config.getDataset(), config.isTruncateTableSet(), collector);
@@ -175,7 +181,7 @@ void initSQLEngineOutput(BatchSinkContext context,
arguments
.put(BigQueryWrite.SQL_OUTPUT_JOB_ID, jobId + "_write")
.put(BigQueryWrite.SQL_OUTPUT_CONFIG, GSON.toJson(config))
- .put(BigQueryWrite.SQL_OUTPUT_SCHEMA, tableSchema != null ? GSON.toJson(tableSchema) : null)
+ .put(BigQueryWrite.SQL_OUTPUT_SCHEMA, GSON.toJson(tableSchema))
.put(BigQueryWrite.SQL_OUTPUT_FIELDS, GSON.toJson(fieldNames));
context.addOutput(new SQLEngineOutput(outputName,
diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkTest.java
index a239cfbc87..e8eae1995e 100644
--- a/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkTest.java
+++ b/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkTest.java
@@ -43,10 +43,13 @@
import java.util.List;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
@@ -87,19 +90,21 @@ public void testBigQuerySinkInvalidConfig() {
@Test
public void testBigQueryTimePartitionConfig() {
- Schema schema = Schema.recordOf("record",
- Schema.Field.of("id", Schema.of(Schema.Type.LONG)),
- Schema.Field.of("name", Schema.of(Schema.Type.STRING)),
- Schema.Field.of("price", Schema.of(Schema.Type.DOUBLE)),
- Schema.Field.of("dt", Schema.nullableOf(Schema.of(Schema.LogicalType.DATE))),
- Schema.Field.of("bytedata", Schema.of(Schema.Type.BYTES)),
- Schema.Field.of("timestamp",
- Schema.nullableOf(Schema.of(Schema.LogicalType.TIMESTAMP_MICROS))));
+ Schema schema =
+ Schema.recordOf("record",
+ Schema.Field.of("id", Schema.of(Schema.Type.LONG)),
+ Schema.Field.of("name", Schema.of(Schema.Type.STRING)),
+ Schema.Field.of("price", Schema.of(Schema.Type.DOUBLE)),
+ Schema.Field.of("dt", Schema.nullableOf(Schema.of(Schema.LogicalType.DATE))),
+ Schema.Field.of("bytedata", Schema.of(Schema.Type.BYTES)),
+ Schema.Field.of("timestamp",
+ Schema.nullableOf(Schema.of(Schema.LogicalType.TIMESTAMP_MICROS))));
- BigQuerySinkConfig config = new BigQuerySinkConfig("44", "ds", "tb", "bucket", schema.toString(),
- "TIME", 0L, 100L, 10L, null);
+ BigQuerySinkConfig config =
+ new BigQuerySinkConfig("44", "ds", "tb", "bucket", schema.toString(),
+ "TIME", 0L, 100L, 10L, null);
config.partitionByField = "dt";
-
+
MockFailureCollector collector = new MockFailureCollector("bqsink");
config.validate(collector);
Assert.assertEquals(0, collector.getValidationFailures().size());
@@ -382,4 +387,19 @@ public void testDatasetWithSpecialCharacters() {
Assert.assertEquals("new_table_2020", multiSink.sanitizeOutputName("new!table?2020"));
Assert.assertEquals("new_table_2020", multiSink.sanitizeOutputName("new^table|2020"));
}
+
+ @Test
+ public void testInitSQLEngineOutputDoesNotInitOutputWithNullSchema() throws Exception {
+ BatchSinkContext sinkContext = mock(BatchSinkContext.class);
+ MockFailureCollector collector = new MockFailureCollector("bqsink");
+
+ BigQuerySinkConfig config =
+ new BigQuerySinkConfig("testmetric", "ds", "tb", "bkt", null,
+ null, null, null, null, null);
+ BigQuery bigQueryMock = mock(BigQuery.class);
+
+ BigQuerySink sink = new BigQuerySink(config);
+ sink.initSQLEngineOutput(sinkContext, bigQueryMock, "sink", "sink", "table", null, collector);
+ verify(sinkContext, never()).addOutput(any());
+ }
}