diff --git a/dataflow/snippets/pom.xml b/dataflow/snippets/pom.xml index 94485d36945..9b5db567aa9 100644 --- a/dataflow/snippets/pom.xml +++ b/dataflow/snippets/pom.xml @@ -37,10 +37,12 @@ 11 11 UTF-8 - 2.67.0 + 2.70.0 2.0.12 - 1.14.0 - 1.4.2 + 1.16.0 + 1.10.0 + 42.7.3 + 1.20.0 @@ -65,6 +67,13 @@ + + + org.apache.avro + avro + + 1.12.0 + com.google.cloud libraries-bom @@ -160,6 +169,11 @@ iceberg-gcp ${iceberg.version} + + org.apache.iceberg + iceberg-parquet + ${iceberg.version} + @@ -181,6 +195,27 @@ test + + + org.apache.beam + beam-sdks-java-io-jdbc + ${apache_beam.version} + + + + org.postgresql + postgresql + ${postgresql.version} + test + + + + org.testcontainers + postgresql + ${testcontainers.version} + test + + com.google.cloud diff --git a/dataflow/snippets/src/main/java/com/example/dataflow/PostgresRead.java b/dataflow/snippets/src/main/java/com/example/dataflow/PostgresRead.java new file mode 100644 index 00000000000..58be27132a8 --- /dev/null +++ b/dataflow/snippets/src/main/java/com/example/dataflow/PostgresRead.java @@ -0,0 +1,97 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.dataflow; + +// [START dataflow_postgres_read] +import com.google.common.collect.ImmutableMap; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.managed.Managed; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.TypeDescriptors; + +public class PostgresRead { + + public interface Options extends PipelineOptions { + @Description("The jdbc url of PostgreSQL database to read from.") + String getJdbcUrl(); + + void setJdbcUrl(String value); + + @Description("The table of PostgresSQL to read from.") + String getTable(); + + void setTable(String value); + + @Description("The username of PostgreSQL database.") + String getUsername(); + + void setUsername(String value); + + @Description("The password of PostgreSQL database.") + String getPassword(); + + void setPassword(String value); + + @Description("Path to write the output file") + String getOutputPath(); + + void setOutputPath(String value); + } + + public static PipelineResult.State main(String[] args) { + // Parse the pipeline options passed into the application. Example: + // --runner=DirectRunner --jdbcUrl=$JDBC_URL --table=$TABLE + // --username=$USERNAME --password=$PASSWORD --outputPath=$OUTPUT_FILE + // For more information, see + // https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options + var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + Pipeline pipeline = createPipeline(options); + return pipeline.run().waitUntilFinish(); + } + + public static Pipeline createPipeline(Options options) { + + // Create configuration parameters for the Managed I/O transform. + ImmutableMap config = + ImmutableMap.builder() + .put("jdbc_url", options.getJdbcUrl()) + .put("location", options.getTable()) + .put("username", options.getUsername()) + .put("password", options.getPassword()) + .build(); + + // Build the pipeline. + var pipeline = Pipeline.create(options); + pipeline + // Read data from Postgres database via managed io. + .apply(Managed.read(Managed.POSTGRES).withConfig(config)) + .getSinglePCollection() + // Convert each row to a string. + .apply( + MapElements.into(TypeDescriptors.strings()) + .via((row -> String.format("%d,%s", row.getInt32("id"), row.getString("name"))))) + // Write strings to a text file. + .apply(TextIO.write().to(options.getOutputPath()).withSuffix(".txt").withNumShards(1)); + return pipeline; + } +} +// [END dataflow_postgres_read] diff --git a/dataflow/snippets/src/main/java/com/example/dataflow/PostgresWrite.java b/dataflow/snippets/src/main/java/com/example/dataflow/PostgresWrite.java new file mode 100644 index 00000000000..7e59bedee8e --- /dev/null +++ b/dataflow/snippets/src/main/java/com/example/dataflow/PostgresWrite.java @@ -0,0 +1,111 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.dataflow; + +// [START dataflow_postgres_write] +import static org.apache.beam.sdk.schemas.Schema.toSchema; + +import com.google.common.collect.ImmutableMap; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Stream; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.managed.Managed; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.Row; + +public class PostgresWrite { + + private static Schema INPUT_SCHEMA = + Stream.of( + Schema.Field.of("id", Schema.FieldType.INT32), + Schema.Field.of("name", Schema.FieldType.STRING)) + .collect(toSchema()); + + private static List ROWS = + Arrays.asList( + Row.withSchema(INPUT_SCHEMA) + .withFieldValue("id", 1) + .withFieldValue("name", "John Doe") + .build(), + Row.withSchema(INPUT_SCHEMA) + .withFieldValue("id", 2) + .withFieldValue("name", "Jane Smith") + .build()); + + public interface Options extends PipelineOptions { + @Description("The jdbc url of PostgreSQL database to write to.") + String getJdbcUrl(); + + void setJdbcUrl(String value); + + @Description("The table of PostgresSQL to write to.") + String getTable(); + + void setTable(String value); + + @Description("The username of PostgreSQL database.") + String getUsername(); + + void setUsername(String value); + + @Description("The password of PostgreSQL database.") + String getPassword(); + + void setPassword(String value); + } + + public static PipelineResult.State main(String[] args) { + // Parse the pipeline options passed into the application. Example: + // --runner=DirectRunner --jdbcUrl=$JDBC_URL --table=$TABLE + // --username=$USERNAME --password=$PASSWORD + // For more information, see + // https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options + var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + Pipeline pipeline = createPipeline(options); + return pipeline.run().waitUntilFinish(); + } + + public static Pipeline createPipeline(Options options) { + + // Create configuration parameters for the Managed I/O transform. + ImmutableMap config = + ImmutableMap.builder() + .put("jdbc_url", options.getJdbcUrl()) + .put("location", options.getTable()) + .put("username", options.getUsername()) + .put("password", options.getPassword()) + .build(); + + // Build the pipeline. + var pipeline = Pipeline.create(options); + pipeline + // Create data to write to Postgres. + .apply(Create.of(ROWS)) + .setRowSchema(INPUT_SCHEMA) + // Write rows to Postgres database via managed io. + .apply(Managed.write(Managed.POSTGRES).withConfig(config)) + .getSinglePCollection(); + return pipeline; + } +} +// [END dataflow_postgres_write] diff --git a/dataflow/snippets/src/test/java/com/example/dataflow/ApacheIcebergIT.java b/dataflow/snippets/src/test/java/com/example/dataflow/ApacheIcebergIT.java index 432d7455b5a..10a06cc64b7 100644 --- a/dataflow/snippets/src/test/java/com/example/dataflow/ApacheIcebergIT.java +++ b/dataflow/snippets/src/test/java/com/example/dataflow/ApacheIcebergIT.java @@ -98,7 +98,7 @@ private void writeTableRecord(Table table) FileAppender appender = Parquet.write(HadoopOutputFile.fromPath(path, hadoopConf)) - .createWriterFunc(GenericParquetWriter::buildWriter) + .createWriterFunc(msgType -> GenericParquetWriter.create(table.schema(), msgType)) .schema(table.schema()) .overwrite() .build(); @@ -166,7 +166,7 @@ public void tearDown() throws IOException, ExecutionException, InterruptedExcept RemoteStorageHelper.forceDelete(storage, bucketName, 1, TimeUnit.MINUTES); } } - + @Test public void testApacheIcebergRestCatalog() throws IOException, InterruptedException { String warehouse = "gs://" + bucketName; diff --git a/dataflow/snippets/src/test/java/com/example/dataflow/PostgresReadIT.java b/dataflow/snippets/src/test/java/com/example/dataflow/PostgresReadIT.java new file mode 100644 index 00000000000..bb8edb9dd5a --- /dev/null +++ b/dataflow/snippets/src/test/java/com/example/dataflow/PostgresReadIT.java @@ -0,0 +1,98 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.dataflow; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.Statement; +import org.apache.beam.sdk.PipelineResult; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.testcontainers.containers.PostgreSQLContainer; + +public class PostgresReadIT { + + private static final String TABLE_NAME = "test_read_table"; + private static final String OUTPUT_PATH = "test-output"; + // The TextIO connector appends this suffix to the pipeline output file. + private static final String OUTPUT_FILE_SUFFIX = "-00000-of-00001.txt"; + private static final String OUTPUT_FILE_NAME = OUTPUT_PATH + OUTPUT_FILE_SUFFIX; + + private static final PostgreSQLContainer postgres = + new PostgreSQLContainer<>("postgres:15-alpine"); + + @Before + public void setUp() throws Exception { + postgres.start(); + + // Initialize the database with table and data + try (Connection conn = + DriverManager.getConnection( + postgres.getJdbcUrl(), postgres.getUsername(), postgres.getPassword())) { + + Statement stmt = conn.createStatement(); + stmt.execute( + String.format("CREATE TABLE %s (id INT PRIMARY KEY, name VARCHAR(255))", TABLE_NAME)); + stmt.execute(String.format("INSERT INTO %s (id, name) VALUES (1, 'John Doe')", TABLE_NAME)); + stmt.execute(String.format("INSERT INTO %s (id, name) VALUES (2, 'Jane Smith')", TABLE_NAME)); + } + } + + @After + public void tearDown() throws IOException { + if (postgres != null) { + postgres.stop(); + } + Files.deleteIfExists(Paths.get(OUTPUT_FILE_NAME)); + } + + @Test + public void testPostgresRead() throws IOException { + // Execute the Beam pipeline + PipelineResult.State state = + PostgresRead.main( + new String[] { + "--runner=DirectRunner", + "--jdbcUrl=" + postgres.getJdbcUrl(), + "--table=" + TABLE_NAME, + "--username=" + postgres.getUsername(), + "--password=" + postgres.getPassword(), + "--outputPath=" + OUTPUT_PATH + }); + + assertEquals(PipelineResult.State.DONE, state); + verifyOutput(); + } + + private void verifyOutput() throws IOException { + File outputFile = new File(OUTPUT_FILE_NAME); + assertTrue("Output file should exist", outputFile.exists()); + + String content = Files.readString(Paths.get(OUTPUT_FILE_NAME)); + + assertTrue(content.contains("1,John Doe")); + assertTrue(content.contains("2,Jane Smith")); + } +} diff --git a/dataflow/snippets/src/test/java/com/example/dataflow/PostgresWriteIT.java b/dataflow/snippets/src/test/java/com/example/dataflow/PostgresWriteIT.java new file mode 100644 index 00000000000..c6b05bf7653 --- /dev/null +++ b/dataflow/snippets/src/test/java/com/example/dataflow/PostgresWriteIT.java @@ -0,0 +1,96 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.dataflow; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.PipelineResult; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.testcontainers.containers.PostgreSQLContainer; + +public class PostgresWriteIT { + + private static final String TABLE_NAME = "test_write_table"; + private static final PostgreSQLContainer postgres = + new PostgreSQLContainer<>("postgres:15-alpine"); + + @Before + public void setUp() throws Exception { + postgres.start(); + + // Pre-create the table so the Managed I/O can find it. + try (Connection conn = + DriverManager.getConnection( + postgres.getJdbcUrl(), postgres.getUsername(), postgres.getPassword())) { + Statement stmt = conn.createStatement(); + stmt.execute( + String.format("CREATE TABLE %s (id INT PRIMARY KEY, name VARCHAR(255))", TABLE_NAME)); + } + } + + @After + public void tearDown() { + if (postgres != null) { + postgres.stop(); + } + } + + @Test + public void testPostgresWrite() throws Exception { + // Execute the Beam pipeline + PipelineResult.State state = + PostgresWrite.main( + new String[] { + "--runner=DirectRunner", + "--jdbcUrl=" + postgres.getJdbcUrl(), + "--table=" + TABLE_NAME, + "--username=" + postgres.getUsername(), + "--password=" + postgres.getPassword() + }); + + assertEquals(PipelineResult.State.DONE, state); + verifyDatabaseContent(); + } + + private void verifyDatabaseContent() throws Exception { + try (Connection conn = + DriverManager.getConnection( + postgres.getJdbcUrl(), postgres.getUsername(), postgres.getPassword())) { + Statement stmt = conn.createStatement(); + ResultSet rs = + stmt.executeQuery(String.format("SELECT id, name FROM %s ORDER BY id", TABLE_NAME)); + + List results = new ArrayList<>(); + while (rs.next()) { + results.add(rs.getInt("id") + "," + rs.getString("name")); + } + + assertEquals("Should have 2 rows", 2, results.size()); + assertTrue("Should contain John Doe", results.contains("1,John Doe")); + assertTrue("Should contain Jane Smith", results.contains("2,Jane Smith")); + } + } +}