diff --git a/pom.xml b/pom.xml index 3107f6bf..0b0648b8 100644 --- a/pom.xml +++ b/pom.xml @@ -38,6 +38,8 @@ 2.8.0 4.11 1.6.5 + 1.10.19 + 3.11.1 v4-rev581-1.25.0 @@ -249,6 +251,31 @@ ${powermock.version} test + + + org.mockito + mockito-all + ${mockito.version} + test + + + org.assertj + assertj-core + ${assertj.version} + test + + + io.cdap.cdap + hydrator-test + ${cdap.version} + test + + + com.google.inject + guice + 4.2.2 + test + diff --git a/src/test/java/io/cdap/plugin/google/common/GenerateCredentials.java b/src/test/java/io/cdap/plugin/google/common/GenerateCredentials.java new file mode 100644 index 00000000..2f3e8cc9 --- /dev/null +++ b/src/test/java/io/cdap/plugin/google/common/GenerateCredentials.java @@ -0,0 +1,62 @@ +/* + * Copyright © 2021 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.google.common; + +import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; +import com.google.api.services.drive.DriveScopes; +import com.google.common.base.Preconditions; +import io.cdap.cdap.api.common.Bytes; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Base64; +import java.util.Collections; + +public class GenerateCredentials { + + protected static final String GCP_SERVICE_ACCOUNT_PATH = "google.application.credentials.path"; + protected static final String GCP_SERVICE_ACCOUNT_BASE64_ENCODED = "google.application.credentials.base64.encoded"; + + public GoogleCredential getServiceAccountCredentials() throws IOException { + // base64-encode the credentials, to avoid a commandline-parsing error, since the credentials have dashes in them + String property = System.getProperty(GCP_SERVICE_ACCOUNT_BASE64_ENCODED); + String serviceAccountCredentials; + if (property != null) { + serviceAccountCredentials = Bytes.toString(Base64.getDecoder().decode(property)); + } else { + property = Preconditions.checkNotNull(System.getProperty(GCP_SERVICE_ACCOUNT_PATH), + "The credentials file provided is null. " + + "Please make sure the path is correct and the file exists."); + + serviceAccountCredentials = new String(Files.readAllBytes(Paths.get(property)), StandardCharsets.UTF_8); + } + + GoogleCredential googleCredential = null; + if (serviceAccountCredentials != null) { + try (InputStream inputStream = new ByteArrayInputStream( + serviceAccountCredentials.getBytes(StandardCharsets.UTF_8))) { + googleCredential = GoogleCredential.fromStream(inputStream).createScoped( + Collections.singletonList(DriveScopes.DRIVE)); + } + } + return googleCredential; + } +} diff --git a/src/test/java/io/cdap/plugin/google/drive/etl/ETLTestGoogleDrive.java b/src/test/java/io/cdap/plugin/google/drive/etl/ETLTestGoogleDrive.java new file mode 100644 index 00000000..77f21bd9 --- /dev/null +++ b/src/test/java/io/cdap/plugin/google/drive/etl/ETLTestGoogleDrive.java @@ -0,0 +1,213 @@ +/* + * Copyright © 2021 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.google.drive.etl; + +import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; +import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; +import com.google.api.client.http.FileContent; +import com.google.api.client.http.javanet.NetHttpTransport; +import com.google.api.client.json.JsonFactory; +import com.google.api.client.json.jackson2.JacksonFactory; +import com.google.api.services.drive.Drive; +import com.google.api.services.drive.model.File; +import com.google.common.collect.ImmutableMap; +import io.cdap.cdap.api.artifact.ArtifactSummary; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.dataset.table.Table; +import io.cdap.cdap.common.utils.Tasks; +import io.cdap.cdap.datapipeline.DataPipelineApp; +import io.cdap.cdap.datapipeline.SmartWorkflow; +import io.cdap.cdap.etl.api.batch.BatchSource; +import io.cdap.cdap.etl.mock.batch.MockSink; +import io.cdap.cdap.etl.mock.test.HydratorTestBase; +import io.cdap.cdap.etl.proto.v2.ETLBatchConfig; +import io.cdap.cdap.etl.proto.v2.ETLPlugin; +import io.cdap.cdap.etl.proto.v2.ETLStage; +import io.cdap.cdap.proto.artifact.AppRequest; +import io.cdap.cdap.proto.id.ApplicationId; +import io.cdap.cdap.proto.id.ArtifactId; +import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.test.ApplicationManager; +import io.cdap.cdap.test.DataSetManager; +import io.cdap.cdap.test.TestConfiguration; +import io.cdap.cdap.test.WorkflowManager; +import io.cdap.plugin.google.common.GenerateCredentials; +import io.cdap.plugin.google.drive.source.GoogleDriveSource; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class ETLTestGoogleDrive extends HydratorTestBase { + + @ClassRule + public static final TestConfiguration CONFIG = new TestConfiguration("explore.enabled", true); + protected static Drive service; + protected static String directoryIdentifier; + protected static final String APPLICATION_NAME = "Google Drive Test"; + protected static final Schema INPUT_SCHEMA = Schema.recordOf( + "input-record", + Schema.Field.of("body", Schema.of(Schema.Type.BYTES)), + Schema.Field.of("offset", Schema.of(Schema.Type.LONG))); + + protected static final ArtifactSummary APP_ARTIFACT = new ArtifactSummary("data-pipeline", "3.2.0"); + protected static final JsonFactory JSON_FACTORY = JacksonFactory.getDefaultInstance(); + + @BeforeClass + public static void setupClient() throws Exception { + + //Generate credentials + GenerateCredentials credentials = new GenerateCredentials(); + GoogleCredential googleCredential = credentials.getServiceAccountCredentials(); + + final NetHttpTransport httpTransport = GoogleNetHttpTransport.newTrustedTransport(); + + //Create the service + service = new Drive.Builder(httpTransport, JSON_FACTORY, googleCredential) + .setApplicationName(APPLICATION_NAME) + .build(); + + //Create the directory + File directoryMetadata = new File(); + directoryMetadata.setName("TestDirectory"); + directoryMetadata.setMimeType("application/vnd.google-apps.folder"); + File directory = service.files().create(directoryMetadata) + .setFields("id") + .execute(); + + directoryIdentifier = directory.getId(); + + // Populate directory with a file + File fileMetadata = new File(); + fileMetadata.setName("csvexample.csv"); + fileMetadata.setParents(Collections.singletonList(directoryIdentifier)); + java.io.File filePath = new java.io.File("src/test/resources/csvexample.csv"); + FileContent mediaContent = new FileContent("text/csv", filePath); + service.files().create(fileMetadata, mediaContent) + .setFields("id, parents") + .execute(); + + ArtifactId parentArtifact = NamespaceId.DEFAULT.artifact(APP_ARTIFACT.getName(), APP_ARTIFACT.getVersion()); + + // add the artifact and mock plugins + setupBatchArtifacts(parentArtifact, DataPipelineApp.class); + + // add our plugins artifact with the artifact as its parent. + // this will make our plugins available. + addPluginArtifact(NamespaceId.DEFAULT.artifact("google-drive-plugins", "1.0.0"), + parentArtifact, GoogleDriveSource.class); + } + + @AfterClass + public static void cleanUp() { + try { + service.files().delete(directoryIdentifier).execute(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Test + public void testGoogleDriveWithMacros() throws Exception { + + ETLStage source = new ETLStage("GoogleDriveETLTest", new ETLPlugin(GoogleDriveSource.NAME, + BatchSource.PLUGIN_TYPE, + getSourceMinimalDefaultConfigs(), + null)); + String outputDatasetName = "output-google_drive_test"; + ETLStage sink = new ETLStage("sink", MockSink.getPlugin(outputDatasetName)); + + ETLBatchConfig etlConfig = ETLBatchConfig.builder() + .addStage(source) + .addStage(sink) + .addConnection(source.getName(), sink.getName()) + .build(); + + ImmutableMap runtimeProperties = + ImmutableMap.of( + "serviceAccountType", "filePath", + "serviceAccountFilePathWithMacro", "auto-detect"); + + ApplicationManager appManager = deployETL(etlConfig, "GoogleDriveWithMacro"); + runETLOnce(appManager, runtimeProperties); + + DataSetManager dataset = getDataset(outputDatasetName); + List outputRecords = MockSink.readOutput(dataset); + + Assert.assertEquals("Expected records", 1, outputRecords.size()); + } + + /** + * Run the SmartWorkflow in the given ETL application for once and wait for the workflow's COMPLETED status + * with 5 minutes timeout. + * + * @param appManager the ETL application to run + * @param arguments the arguments to be passed when running SmartWorkflow + */ + protected WorkflowManager runETLOnce(ApplicationManager appManager, Map arguments) + throws TimeoutException, InterruptedException, ExecutionException { + final WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); + int numRuns = workflowManager.getHistory().size(); + workflowManager.start(arguments); + Tasks.waitFor(numRuns + 1, () -> workflowManager.getHistory().size(), 20, TimeUnit.SECONDS); + workflowManager.waitForStopped(5, TimeUnit.MINUTES); + return workflowManager; + } + + protected ApplicationManager deployETL(ETLBatchConfig etlConfig, String appName) throws Exception { + AppRequest appRequest = new AppRequest<>(APP_ARTIFACT, etlConfig); + ApplicationId appId = NamespaceId.DEFAULT.app(appName); + return deployApplication(appId, appRequest); + } + + public Map getSourceMinimalDefaultConfigs() { + Map sourceProps = new HashMap<>(); + sourceProps.put("referenceName", "google_drive_with_macro"); + sourceProps.put("fileTypesToPull", "documents,binary,spreadsheets,drawings,presentations,appsScripts"); + sourceProps.put("bodyFormat", "string"); + sourceProps.put("sheetsExportingFormat", "text/plain"); + sourceProps.put("docsExportingFormat", "text/csv"); + sourceProps.put("drawingsExportingFormat", "image/svg+xml"); + sourceProps.put("presentationsExportingFormat", "text/plain"); + sourceProps.put("maxRetryJitterWait", "100"); + sourceProps.put("schemaBodyFieldName", "bytes"); + sourceProps.put("schemaNameFieldName", "field"); + sourceProps.put("schemaMimeFieldName", "string"); + sourceProps.put("maxPartitionSize", "0"); + sourceProps.put("maxRetryWait", "200"); + sourceProps.put("maxRetryCount", "8"); + sourceProps.put("modificationDateRange", "today"); + sourceProps.put("directoryIdentifier", directoryIdentifier); + sourceProps.put("authType", "serviceAccount"); + sourceProps.put("serviceAccountType", "${serviceAccountType}"); + sourceProps.put("accountFilePath", "${serviceAccountFilePathWithMacro}"); + sourceProps.put("schema", INPUT_SCHEMA.toString()); + return sourceProps; + } +} diff --git a/src/test/java/io/cdap/plugin/google/drive/etl/ETLTestGoogleSheets.java b/src/test/java/io/cdap/plugin/google/drive/etl/ETLTestGoogleSheets.java new file mode 100644 index 00000000..c3a360c1 --- /dev/null +++ b/src/test/java/io/cdap/plugin/google/drive/etl/ETLTestGoogleSheets.java @@ -0,0 +1,248 @@ +/* + * Copyright © 2021 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.google.drive.etl; + +import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; +import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; +import com.google.api.client.http.javanet.NetHttpTransport; +import com.google.api.client.json.JsonFactory; +import com.google.api.client.json.jackson2.JacksonFactory; +import com.google.api.services.drive.Drive; +import com.google.api.services.drive.model.File; +import com.google.api.services.sheets.v4.Sheets; +import com.google.api.services.sheets.v4.model.Spreadsheet; +import com.google.api.services.sheets.v4.model.SpreadsheetProperties; +import com.google.api.services.sheets.v4.model.ValueRange; +import com.google.common.collect.ImmutableMap; +import io.cdap.cdap.api.artifact.ArtifactSummary; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.dataset.table.Table; +import io.cdap.cdap.common.utils.Tasks; +import io.cdap.cdap.datapipeline.DataPipelineApp; +import io.cdap.cdap.datapipeline.SmartWorkflow; +import io.cdap.cdap.etl.api.batch.BatchSource; +import io.cdap.cdap.etl.mock.batch.MockSink; +import io.cdap.cdap.etl.mock.test.HydratorTestBase; +import io.cdap.cdap.etl.proto.v2.ETLBatchConfig; +import io.cdap.cdap.etl.proto.v2.ETLPlugin; +import io.cdap.cdap.etl.proto.v2.ETLStage; +import io.cdap.cdap.proto.artifact.AppRequest; +import io.cdap.cdap.proto.id.ApplicationId; +import io.cdap.cdap.proto.id.ArtifactId; +import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.test.ApplicationManager; +import io.cdap.cdap.test.DataSetManager; +import io.cdap.cdap.test.TestConfiguration; +import io.cdap.cdap.test.WorkflowManager; +import io.cdap.plugin.google.common.GenerateCredentials; +import io.cdap.plugin.google.sheets.source.GoogleSheetsSource; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class ETLTestGoogleSheets extends HydratorTestBase { + + @ClassRule + public static final TestConfiguration CONFIG = new TestConfiguration("explore.enabled", true); + protected static Sheets service; + protected static Drive drive; + protected static String directoryIdentifier; + protected static final String APPLICATION_NAME = "Google Sheets Test"; + protected static final Schema INPUT_SCHEMA = Schema.recordOf( + "input-record", + Schema.Field.of("body", Schema.of(Schema.Type.BYTES)), + Schema.Field.of("offset", Schema.of(Schema.Type.LONG))); + + protected static final ArtifactSummary APP_ARTIFACT = new ArtifactSummary("data-pipeline", "3.2.0"); + protected static final JsonFactory JSON_FACTORY = JacksonFactory.getDefaultInstance(); + + + @BeforeClass + public static void setupClient() throws Exception { + + //Generate credentials + GenerateCredentials credentials = new GenerateCredentials(); + GoogleCredential googleCredential = credentials.getServiceAccountCredentials(); + + final NetHttpTransport httpTransport = GoogleNetHttpTransport.newTrustedTransport(); + + //Create necessary services + service = new Sheets.Builder(httpTransport, JSON_FACTORY, googleCredential) + .setApplicationName(APPLICATION_NAME) + .build(); + + drive = new Drive.Builder(httpTransport, JSON_FACTORY, googleCredential) + .setApplicationName(APPLICATION_NAME) + .build(); + + //Create the directory + File directoryMetadata = new File(); + directoryMetadata.setName("TestDirectory"); + directoryMetadata.setMimeType("application/vnd.google-apps.folder"); + File directory = drive.files().create(directoryMetadata) + .setFields("id") + .execute(); + + directoryIdentifier = directory.getId(); + + //Creating the spreadsheet + Spreadsheet spreadsheet = new Spreadsheet() + .setProperties(new SpreadsheetProperties() + .setTitle("TestSpreadSheet")); + + spreadsheet = service.spreadsheets().create(spreadsheet) + .setFields("spreadsheetId") + .execute(); + + String spreadsheetId = spreadsheet.getSpreadsheetId(); + String range = "Sheet1!A:A"; + + //Adding the rows + List row = new ArrayList<>(); + row.add("TestRow"); + + List> values = new ArrayList<>(); + values.add(row); + + ValueRange valueRange = new ValueRange(); + valueRange.setMajorDimension("ROWS"); + valueRange.setValues(values); + + //Appending the values + service.spreadsheets().values().append(spreadsheetId, range, valueRange).setValueInputOption("RAW").execute(); + + //Sending spreadsheet to created directory + drive.files().update(spreadsheet.getSpreadsheetId(), null) + .setAddParents(directoryIdentifier) + .execute(); + + ArtifactId parentArtifact = NamespaceId.DEFAULT.artifact(APP_ARTIFACT.getName(), APP_ARTIFACT.getVersion()); + + // add the artifact and mock plugins + setupBatchArtifacts(parentArtifact, DataPipelineApp.class); + + // add our plugins artifact with the artifact as its parent. + // this will make our plugins available. + addPluginArtifact(NamespaceId.DEFAULT.artifact("google-sheets-plugins", "1.0.0"), + parentArtifact, GoogleSheetsSource.class); + } + + @AfterClass + public static void cleanUp() { + try { + drive.files().delete(directoryIdentifier).execute(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Test + public void testGoogleSheetsWithMacros() throws Exception { + + ETLStage source = new ETLStage("GoogleSheetsETLTest", new ETLPlugin(GoogleSheetsSource.NAME, + BatchSource.PLUGIN_TYPE, + getSourceMinimalDefaultConfigs(), + null)); + String outputDatasetName = "output-google_sheets_test"; + ETLStage sink = new ETLStage("sink", MockSink.getPlugin(outputDatasetName)); + + ETLBatchConfig etlConfig = ETLBatchConfig.builder() + .addStage(source) + .addStage(sink) + .addConnection(source.getName(), sink.getName()) + .build(); + + ImmutableMap runtimeProperties = + ImmutableMap.of("serviceAccountType", "filePath", + "serviceAccountFilePathWithMacro", "auto-detect"); + + ApplicationManager appManager = deployETL(etlConfig, "GoogleSheetsWithMacro"); + runETLOnce(appManager, runtimeProperties); + + DataSetManager
dataset = getDataset(outputDatasetName); + List outputRecords = MockSink.readOutput(dataset); + + Assert.assertEquals("Expected records", 999, outputRecords.size()); + } + + /** + * Run the SmartWorkflow in the given ETL application for once and wait for the workflow's COMPLETED status + * with 5 minutes timeout. + * + * @param appManager the ETL application to run + * @param arguments the arguments to be passed when running SmartWorkflow + */ + protected WorkflowManager runETLOnce(ApplicationManager appManager, Map arguments) + throws TimeoutException, InterruptedException, ExecutionException { + final WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); + int numRuns = workflowManager.getHistory().size(); + workflowManager.start(arguments); + Tasks.waitFor(numRuns + 1, () -> workflowManager.getHistory().size(), 20, TimeUnit.SECONDS); + workflowManager.waitForStopped(5, TimeUnit.MINUTES); + return workflowManager; + } + + protected ApplicationManager deployETL(ETLBatchConfig etlConfig, String appName) throws Exception { + AppRequest appRequest = new AppRequest<>(APP_ARTIFACT, etlConfig); + ApplicationId appId = NamespaceId.DEFAULT.app(appName); + return deployApplication(appId, appRequest); + } + + public Map getSourceMinimalDefaultConfigs() { + Map sourceProps = new HashMap<>(); + sourceProps.put("referenceName", "google_sheets_with_macro"); + sourceProps.put("sheetsToPull", "all"); + sourceProps.put("bodyFormat", "string"); + sourceProps.put("formatting", "valuesOnly"); + sourceProps.put("metadataFieldName", "metadata"); + sourceProps.put("extractMetadata", "false"); + sourceProps.put("skipEmptyData", "false"); + sourceProps.put("maxRetryJitterWait", "100"); + sourceProps.put("schemaBodyFieldName", "bytes"); + sourceProps.put("schemaNameFieldName", "field"); + sourceProps.put("schemaMimeFieldName", "string"); + sourceProps.put("maxPartitionSize", "0"); + sourceProps.put("readBufferSize", "100"); + sourceProps.put("maxRetryWait", "200"); + sourceProps.put("maxRetryCount", "8"); + sourceProps.put("modificationDateRange", "today"); + sourceProps.put("directoryIdentifier", directoryIdentifier); + sourceProps.put("authType", "serviceAccount"); + sourceProps.put("serviceAccountType", "${serviceAccountType}"); + sourceProps.put("accountFilePath", "${serviceAccountFilePathWithMacro}"); + sourceProps.put("schema", INPUT_SCHEMA.toString()); + sourceProps.put("columnNamesSelection", "firstRowAsColumns"); + sourceProps.put("firstHeaderRow", "1"); + sourceProps.put("lastDataColumn", "26"); + sourceProps.put("lastDataRow", "1000"); + sourceProps.put("addNameFields", "false"); + return sourceProps; + } +} diff --git a/src/test/resources/csvexample.csv b/src/test/resources/csvexample.csv new file mode 100644 index 00000000..a902d62f --- /dev/null +++ b/src/test/resources/csvexample.csv @@ -0,0 +1,6 @@ +Identifier,First_name,Last_name +901242,Rachel,Booker +207074,Laura,Grey +408129,Craig,Johnson +934600,Mary,Jenkins +507916,Jamie,Smith \ No newline at end of file