From 3c15a74a3dceb5ea183309c5f9dc0fc6a2a19561 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 2 Mar 2026 21:50:20 -0500 Subject: [PATCH 1/7] Fix missing symbol error while upgrading beam to 2.69. --- dataflow/snippets/pom.xml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/dataflow/snippets/pom.xml b/dataflow/snippets/pom.xml index 94485d36945..d0ca31e4ade 100644 --- a/dataflow/snippets/pom.xml +++ b/dataflow/snippets/pom.xml @@ -37,7 +37,7 @@ 11 11 UTF-8 - 2.67.0 + 2.69.0 2.0.12 1.14.0 1.4.2 @@ -160,6 +160,11 @@ iceberg-gcp ${iceberg.version} + + org.apache.iceberg + iceberg-parquet + ${iceberg.version} + From e5184622fbcff52866254ad7a31b7ff6bfadd305 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 2 Mar 2026 21:53:18 -0500 Subject: [PATCH 2/7] Add an example for reading from postgres via managed io. --- dataflow/snippets/pom.xml | 20 ++++ .../com/example/dataflow/PostgresRead.java | 103 ++++++++++++++++++ .../com/example/dataflow/PostgresReadIT.java | 96 ++++++++++++++++ 3 files changed, 219 insertions(+) create mode 100644 dataflow/snippets/src/main/java/com/example/dataflow/PostgresRead.java create mode 100644 dataflow/snippets/src/test/java/com/example/dataflow/PostgresReadIT.java diff --git a/dataflow/snippets/pom.xml b/dataflow/snippets/pom.xml index d0ca31e4ade..2d18b6bb777 100644 --- a/dataflow/snippets/pom.xml +++ b/dataflow/snippets/pom.xml @@ -186,6 +186,26 @@ test + + + org.apache.beam + beam-sdks-java-io-jdbc + ${apache_beam.version} + + + + org.postgresql + postgresql + 42.7.3 + + + + org.testcontainers + postgresql + 1.20.0 + test + + com.google.cloud diff --git a/dataflow/snippets/src/main/java/com/example/dataflow/PostgresRead.java b/dataflow/snippets/src/main/java/com/example/dataflow/PostgresRead.java new file mode 100644 index 00000000000..9b948e1e2f4 --- /dev/null +++ b/dataflow/snippets/src/main/java/com/example/dataflow/PostgresRead.java @@ -0,0 +1,103 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.dataflow; + +// [START dataflow_postgres_read] +import com.google.common.collect.ImmutableMap; +import java.io.UnsupportedEncodingException; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.managed.Managed; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.TypeDescriptors; + +public class PostgresRead { + + public interface Options extends PipelineOptions { + @Description("The jdbc url of PostgreSQL database to read from.") + String getJdbcUrl(); + + void setJdbcUrl(String value); + + @Description("The table of PostgresSQL to read from.") + String getTable(); + + void setTable(String value); + + @Description("The username of PostgreSQL database.") + String getUsername(); + + void setUsername(String value); + + @Description("The password of PostgreSQL database.") + String getPassword(); + + void setPassword(String value); + + @Description("Path to write the output file") + String getOutputPath(); + + void setOutputPath(String value); + } + + public static PipelineResult.State main(String[] args) { + // Parse the pipeline options passed into the application. Example: + // --runner=DirectRunner --jdbcUrl=$JDBC_URL --table=$TABLE + // --username=$USERNAME --password=$PASSWORD --outputPath=$OUTPUT_FILE + // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options + var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + Pipeline pipeline = createPipeline(options); + return pipeline.run().waitUntilFinish(); + } + + public static Pipeline createPipeline(Options options) { + + // Create configuration parameters for the Managed I/O transform. + ImmutableMap config = ImmutableMap.builder() + .put("jdbc_url", options.getJdbcUrl()) + .put("location", options.getTable()) + .put("username", options.getUsername()) + .put("password", options.getPassword()) + .build(); + + // Build the pipeline. + var pipeline = Pipeline.create(options); + pipeline + // Read data from Postgres database via managed io. + .apply(Managed.read(Managed.POSTGRES).withConfig(config)).getSinglePCollection() + // Convert each row to a string. + .apply(MapElements + .into(TypeDescriptors.strings()) + .via((row -> { + var id = row.getInt32("id"); + var name = row.getString("name"); + return String.format("%d,%s", id, name); + }))) + // Write strings to a text file. + .apply(TextIO + .write() + .to(options.getOutputPath()) + .withSuffix(".txt") + .withNumShards(1)); + return pipeline; + } +} +// [END dataflow_postgres_read] \ No newline at end of file diff --git a/dataflow/snippets/src/test/java/com/example/dataflow/PostgresReadIT.java b/dataflow/snippets/src/test/java/com/example/dataflow/PostgresReadIT.java new file mode 100644 index 00000000000..9f4381e2a19 --- /dev/null +++ b/dataflow/snippets/src/test/java/com/example/dataflow/PostgresReadIT.java @@ -0,0 +1,96 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.dataflow; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.Statement; +import org.apache.beam.sdk.PipelineResult; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.testcontainers.containers.PostgreSQLContainer; + +public class PostgresReadIT { + + private static final String TABLE_NAME = "test_table"; + private static final String OUTPUT_PATH = "test-output"; + // The TextIO connector appends this suffix to the pipeline output file. + private static final String OUTPUT_FILE_SUFFIX = "-00000-of-00001.txt"; + private static final String OUTPUT_FILE_NAME = OUTPUT_PATH + OUTPUT_FILE_SUFFIX; + + private static final PostgreSQLContainer postgres = + new PostgreSQLContainer<>("postgres:15-alpine"); + + @Before + public void setUp() throws Exception { + postgres.start(); + + // Initialize the database with table and data + try (Connection conn = DriverManager.getConnection( + postgres.getJdbcUrl(), + postgres.getUsername(), + postgres.getPassword())) { + + Statement stmt = conn.createStatement(); + stmt.execute(String.format("CREATE TABLE %s (id INT PRIMARY KEY, name VARCHAR(255))", TABLE_NAME)); + stmt.execute(String.format("INSERT INTO %s (id, name) VALUES (1, 'John Doe')", TABLE_NAME)); + stmt.execute(String.format("INSERT INTO test_table (id, name) VALUES (2, 'Jane Smith')", TABLE_NAME)); + } + } + + @After + public void tearDown() throws IOException { + if (postgres != null) { + postgres.stop(); + } + Files.deleteIfExists(Paths.get(OUTPUT_FILE_NAME)); + } + + @Test + public void testPostgresRead() throws IOException { + // Execute the Beam pipeline + PipelineResult.State state = PostgresRead.main(new String[] { + "--runner=DirectRunner", + "--jdbcUrl=" + postgres.getJdbcUrl(), + "--table=" + TABLE_NAME, + "--username=" + postgres.getUsername(), + "--password=" + postgres.getPassword(), + "--outputPath=" + OUTPUT_PATH + }); + + assertEquals(PipelineResult.State.DONE, state); + verifyOutput(); + } + + private void verifyOutput() throws IOException { + File outputFile = new File(OUTPUT_FILE_NAME); + assertTrue("Output file should exist", outputFile.exists()); + + String content = Files.readString(Paths.get(OUTPUT_FILE_NAME)); + + assertTrue(content.contains("1,John Doe")); + assertTrue(content.contains("2,Jane Smith")); + } +} \ No newline at end of file From dd86fb77cd635bf19cba832513120434d2e3a3ba Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 2 Mar 2026 23:10:10 -0500 Subject: [PATCH 3/7] Add an example for write to postgres via managed io. --- .../com/example/dataflow/PostgresWrite.java | 113 ++++++++++++++++++ .../com/example/dataflow/PostgresWriteIT.java | 94 +++++++++++++++ 2 files changed, 207 insertions(+) create mode 100644 dataflow/snippets/src/main/java/com/example/dataflow/PostgresWrite.java create mode 100644 dataflow/snippets/src/test/java/com/example/dataflow/PostgresWriteIT.java diff --git a/dataflow/snippets/src/main/java/com/example/dataflow/PostgresWrite.java b/dataflow/snippets/src/main/java/com/example/dataflow/PostgresWrite.java new file mode 100644 index 00000000000..75217c7d692 --- /dev/null +++ b/dataflow/snippets/src/main/java/com/example/dataflow/PostgresWrite.java @@ -0,0 +1,113 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.dataflow; + +// [START dataflow_postgres_write] +import static org.apache.beam.sdk.schemas.Schema.toSchema; + +import com.google.common.collect.ImmutableMap; +import java.io.UnsupportedEncodingException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Stream; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.managed.Managed; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; + +public class PostgresWrite { + + private static Schema INPUT_SCHEMA = + Stream.of( + Schema.Field.of( + "id", Schema.FieldType.INT32), + Schema.Field.of( + "name", Schema.FieldType.STRING)) + .collect(toSchema()); + + private static List ROWS = + Arrays.asList( + Row.withSchema(INPUT_SCHEMA) + .withFieldValue("id", 1) + .withFieldValue("name", "John Doe") + .build(), + Row.withSchema(INPUT_SCHEMA) + .withFieldValue("id", 2) + .withFieldValue("name", "Jane Smith") + .build()); + + public interface Options extends PipelineOptions { + @Description("The jdbc url of PostgreSQL database to write to.") + String getJdbcUrl(); + + void setJdbcUrl(String value); + + @Description("The table of PostgresSQL to write to.") + String getTable(); + + void setTable(String value); + + @Description("The username of PostgreSQL database.") + String getUsername(); + + void setUsername(String value); + + @Description("The password of PostgreSQL database.") + String getPassword(); + + void setPassword(String value); + } + + public static PipelineResult.State main(String[] args) { + // Parse the pipeline options passed into the application. Example: + // --runner=DirectRunner --jdbcUrl=$JDBC_URL --table=$TABLE + // --username=$USERNAME --password=$PASSWORD + // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options + var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + Pipeline pipeline = createPipeline(options); + return pipeline.run().waitUntilFinish(); + } + + public static Pipeline createPipeline(Options options) { + + // Create configuration parameters for the Managed I/O transform. + ImmutableMap config = ImmutableMap.builder() + .put("jdbc_url", options.getJdbcUrl()) + .put("location", options.getTable()) + .put("username", options.getUsername()) + .put("password", options.getPassword()) + .build(); + + // Build the pipeline. + var pipeline = Pipeline.create(options); + pipeline + // Create data to write to Postgres. + .apply(Create.of(ROWS)).setRowSchema(INPUT_SCHEMA) + // Write rows to Postgres database via managed io. + .apply(Managed.write(Managed.POSTGRES).withConfig(config)).getSinglePCollection(); + return pipeline; + } +} +// [END dataflow_postgres_write] \ No newline at end of file diff --git a/dataflow/snippets/src/test/java/com/example/dataflow/PostgresWriteIT.java b/dataflow/snippets/src/test/java/com/example/dataflow/PostgresWriteIT.java new file mode 100644 index 00000000000..25d4df5daac --- /dev/null +++ b/dataflow/snippets/src/test/java/com/example/dataflow/PostgresWriteIT.java @@ -0,0 +1,94 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.dataflow; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.PipelineResult; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.testcontainers.containers.PostgreSQLContainer; + +public class PostgresWriteIT { + + private static final String TABLE_NAME = "test_table"; + private static final PostgreSQLContainer postgres = + new PostgreSQLContainer<>("postgres:15-alpine"); + + @Before + public void setUp() throws Exception { + postgres.start(); + + // Pre-create the table so the Managed I/O can find it. + try (Connection conn = DriverManager.getConnection( + postgres.getJdbcUrl(), + postgres.getUsername(), + postgres.getPassword())) { + Statement stmt = conn.createStatement(); + stmt.execute(String.format("CREATE TABLE %s (id INT PRIMARY KEY, name VARCHAR(255))", TABLE_NAME)); + } + } + + @After + public void tearDown() { + if (postgres != null) { + postgres.stop(); + } + } + + @Test + public void testPostgresWrite() throws Exception { + // Execute the Beam pipeline + PipelineResult.State state = PostgresWrite.main(new String[] { + "--runner=DirectRunner", + "--jdbcUrl=" + postgres.getJdbcUrl(), + "--table=" + TABLE_NAME, + "--username=" + postgres.getUsername(), + "--password=" + postgres.getPassword() + }); + + assertEquals(PipelineResult.State.DONE, state); + verifyDatabaseContent(); + } + + private void verifyDatabaseContent() throws Exception { + try (Connection conn = DriverManager.getConnection( + postgres.getJdbcUrl(), + postgres.getUsername(), + postgres.getPassword())) { + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(String.format("SELECT id, name FROM %s ORDER BY id", TABLE_NAME)); + + List results = new ArrayList<>(); + while (rs.next()) { + results.add(rs.getInt("id") + "," + rs.getString("name")); + } + + assertEquals("Should have 2 rows", 2, results.size()); + assertTrue("Should contain John Doe", results.contains("1,John Doe")); + assertTrue("Should contain Jane Smith", results.contains("2,Jane Smith")); + } + } +} \ No newline at end of file From 91593b60bc2be92e340e85c674d9109ea54ee60d Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 2 Mar 2026 23:27:00 -0500 Subject: [PATCH 4/7] Reformat using google-java-format --- .../com/example/dataflow/PostgresRead.java | 43 +++++++------- .../com/example/dataflow/PostgresWrite.java | 56 +++++++++---------- .../com/example/dataflow/PostgresReadIT.java | 33 ++++++----- .../com/example/dataflow/PostgresWriteIT.java | 38 +++++++------ 4 files changed, 86 insertions(+), 84 deletions(-) diff --git a/dataflow/snippets/src/main/java/com/example/dataflow/PostgresRead.java b/dataflow/snippets/src/main/java/com/example/dataflow/PostgresRead.java index 9b948e1e2f4..96593fe46e1 100644 --- a/dataflow/snippets/src/main/java/com/example/dataflow/PostgresRead.java +++ b/dataflow/snippets/src/main/java/com/example/dataflow/PostgresRead.java @@ -18,7 +18,6 @@ // [START dataflow_postgres_read] import com.google.common.collect.ImmutableMap; -import java.io.UnsupportedEncodingException; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.TextIO; @@ -62,7 +61,8 @@ public static PipelineResult.State main(String[] args) { // Parse the pipeline options passed into the application. Example: // --runner=DirectRunner --jdbcUrl=$JDBC_URL --table=$TABLE // --username=$USERNAME --password=$PASSWORD --outputPath=$OUTPUT_FILE - // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options + // For more information, see + // https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); Pipeline pipeline = createPipeline(options); return pipeline.run().waitUntilFinish(); @@ -71,33 +71,32 @@ public static PipelineResult.State main(String[] args) { public static Pipeline createPipeline(Options options) { // Create configuration parameters for the Managed I/O transform. - ImmutableMap config = ImmutableMap.builder() - .put("jdbc_url", options.getJdbcUrl()) - .put("location", options.getTable()) - .put("username", options.getUsername()) - .put("password", options.getPassword()) - .build(); + ImmutableMap config = + ImmutableMap.builder() + .put("jdbc_url", options.getJdbcUrl()) + .put("location", options.getTable()) + .put("username", options.getUsername()) + .put("password", options.getPassword()) + .build(); // Build the pipeline. var pipeline = Pipeline.create(options); pipeline // Read data from Postgres database via managed io. - .apply(Managed.read(Managed.POSTGRES).withConfig(config)).getSinglePCollection() + .apply(Managed.read(Managed.POSTGRES).withConfig(config)) + .getSinglePCollection() // Convert each row to a string. - .apply(MapElements - .into(TypeDescriptors.strings()) - .via((row -> { - var id = row.getInt32("id"); - var name = row.getString("name"); - return String.format("%d,%s", id, name); - }))) + .apply( + MapElements.into(TypeDescriptors.strings()) + .via( + (row -> { + var id = row.getInt32("id"); + var name = row.getString("name"); + return String.format("%d,%s", id, name); + }))) // Write strings to a text file. - .apply(TextIO - .write() - .to(options.getOutputPath()) - .withSuffix(".txt") - .withNumShards(1)); + .apply(TextIO.write().to(options.getOutputPath()).withSuffix(".txt").withNumShards(1)); return pipeline; } } -// [END dataflow_postgres_read] \ No newline at end of file +// [END dataflow_postgres_read] diff --git a/dataflow/snippets/src/main/java/com/example/dataflow/PostgresWrite.java b/dataflow/snippets/src/main/java/com/example/dataflow/PostgresWrite.java index 75217c7d692..7e59bedee8e 100644 --- a/dataflow/snippets/src/main/java/com/example/dataflow/PostgresWrite.java +++ b/dataflow/snippets/src/main/java/com/example/dataflow/PostgresWrite.java @@ -20,43 +20,37 @@ import static org.apache.beam.sdk.schemas.Schema.toSchema; import com.google.common.collect.ImmutableMap; -import java.io.UnsupportedEncodingException; import java.util.Arrays; import java.util.List; import java.util.stream.Stream; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.managed.Managed; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.Row; -import org.apache.beam.sdk.values.TypeDescriptors; public class PostgresWrite { private static Schema INPUT_SCHEMA = - Stream.of( - Schema.Field.of( - "id", Schema.FieldType.INT32), - Schema.Field.of( - "name", Schema.FieldType.STRING)) - .collect(toSchema()); + Stream.of( + Schema.Field.of("id", Schema.FieldType.INT32), + Schema.Field.of("name", Schema.FieldType.STRING)) + .collect(toSchema()); private static List ROWS = - Arrays.asList( - Row.withSchema(INPUT_SCHEMA) - .withFieldValue("id", 1) - .withFieldValue("name", "John Doe") - .build(), - Row.withSchema(INPUT_SCHEMA) - .withFieldValue("id", 2) - .withFieldValue("name", "Jane Smith") - .build()); + Arrays.asList( + Row.withSchema(INPUT_SCHEMA) + .withFieldValue("id", 1) + .withFieldValue("name", "John Doe") + .build(), + Row.withSchema(INPUT_SCHEMA) + .withFieldValue("id", 2) + .withFieldValue("name", "Jane Smith") + .build()); public interface Options extends PipelineOptions { @Description("The jdbc url of PostgreSQL database to write to.") @@ -84,7 +78,8 @@ public static PipelineResult.State main(String[] args) { // Parse the pipeline options passed into the application. Example: // --runner=DirectRunner --jdbcUrl=$JDBC_URL --table=$TABLE // --username=$USERNAME --password=$PASSWORD - // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options + // For more information, see + // https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); Pipeline pipeline = createPipeline(options); return pipeline.run().waitUntilFinish(); @@ -93,21 +88,24 @@ public static PipelineResult.State main(String[] args) { public static Pipeline createPipeline(Options options) { // Create configuration parameters for the Managed I/O transform. - ImmutableMap config = ImmutableMap.builder() - .put("jdbc_url", options.getJdbcUrl()) - .put("location", options.getTable()) - .put("username", options.getUsername()) - .put("password", options.getPassword()) - .build(); + ImmutableMap config = + ImmutableMap.builder() + .put("jdbc_url", options.getJdbcUrl()) + .put("location", options.getTable()) + .put("username", options.getUsername()) + .put("password", options.getPassword()) + .build(); // Build the pipeline. var pipeline = Pipeline.create(options); pipeline // Create data to write to Postgres. - .apply(Create.of(ROWS)).setRowSchema(INPUT_SCHEMA) + .apply(Create.of(ROWS)) + .setRowSchema(INPUT_SCHEMA) // Write rows to Postgres database via managed io. - .apply(Managed.write(Managed.POSTGRES).withConfig(config)).getSinglePCollection(); + .apply(Managed.write(Managed.POSTGRES).withConfig(config)) + .getSinglePCollection(); return pipeline; } } -// [END dataflow_postgres_write] \ No newline at end of file +// [END dataflow_postgres_write] diff --git a/dataflow/snippets/src/test/java/com/example/dataflow/PostgresReadIT.java b/dataflow/snippets/src/test/java/com/example/dataflow/PostgresReadIT.java index 9f4381e2a19..a58457d083c 100644 --- a/dataflow/snippets/src/test/java/com/example/dataflow/PostgresReadIT.java +++ b/dataflow/snippets/src/test/java/com/example/dataflow/PostgresReadIT.java @@ -48,15 +48,16 @@ public void setUp() throws Exception { postgres.start(); // Initialize the database with table and data - try (Connection conn = DriverManager.getConnection( - postgres.getJdbcUrl(), - postgres.getUsername(), - postgres.getPassword())) { + try (Connection conn = + DriverManager.getConnection( + postgres.getJdbcUrl(), postgres.getUsername(), postgres.getPassword())) { Statement stmt = conn.createStatement(); - stmt.execute(String.format("CREATE TABLE %s (id INT PRIMARY KEY, name VARCHAR(255))", TABLE_NAME)); + stmt.execute( + String.format("CREATE TABLE %s (id INT PRIMARY KEY, name VARCHAR(255))", TABLE_NAME)); stmt.execute(String.format("INSERT INTO %s (id, name) VALUES (1, 'John Doe')", TABLE_NAME)); - stmt.execute(String.format("INSERT INTO test_table (id, name) VALUES (2, 'Jane Smith')", TABLE_NAME)); + stmt.execute( + String.format("INSERT INTO test_table (id, name) VALUES (2, 'Jane Smith')", TABLE_NAME)); } } @@ -71,14 +72,16 @@ public void tearDown() throws IOException { @Test public void testPostgresRead() throws IOException { // Execute the Beam pipeline - PipelineResult.State state = PostgresRead.main(new String[] { - "--runner=DirectRunner", - "--jdbcUrl=" + postgres.getJdbcUrl(), - "--table=" + TABLE_NAME, - "--username=" + postgres.getUsername(), - "--password=" + postgres.getPassword(), - "--outputPath=" + OUTPUT_PATH - }); + PipelineResult.State state = + PostgresRead.main( + new String[] { + "--runner=DirectRunner", + "--jdbcUrl=" + postgres.getJdbcUrl(), + "--table=" + TABLE_NAME, + "--username=" + postgres.getUsername(), + "--password=" + postgres.getPassword(), + "--outputPath=" + OUTPUT_PATH + }); assertEquals(PipelineResult.State.DONE, state); verifyOutput(); @@ -93,4 +96,4 @@ private void verifyOutput() throws IOException { assertTrue(content.contains("1,John Doe")); assertTrue(content.contains("2,Jane Smith")); } -} \ No newline at end of file +} diff --git a/dataflow/snippets/src/test/java/com/example/dataflow/PostgresWriteIT.java b/dataflow/snippets/src/test/java/com/example/dataflow/PostgresWriteIT.java index 25d4df5daac..86e297a9154 100644 --- a/dataflow/snippets/src/test/java/com/example/dataflow/PostgresWriteIT.java +++ b/dataflow/snippets/src/test/java/com/example/dataflow/PostgresWriteIT.java @@ -42,12 +42,12 @@ public void setUp() throws Exception { postgres.start(); // Pre-create the table so the Managed I/O can find it. - try (Connection conn = DriverManager.getConnection( - postgres.getJdbcUrl(), - postgres.getUsername(), - postgres.getPassword())) { + try (Connection conn = + DriverManager.getConnection( + postgres.getJdbcUrl(), postgres.getUsername(), postgres.getPassword())) { Statement stmt = conn.createStatement(); - stmt.execute(String.format("CREATE TABLE %s (id INT PRIMARY KEY, name VARCHAR(255))", TABLE_NAME)); + stmt.execute( + String.format("CREATE TABLE %s (id INT PRIMARY KEY, name VARCHAR(255))", TABLE_NAME)); } } @@ -61,25 +61,27 @@ public void tearDown() { @Test public void testPostgresWrite() throws Exception { // Execute the Beam pipeline - PipelineResult.State state = PostgresWrite.main(new String[] { - "--runner=DirectRunner", - "--jdbcUrl=" + postgres.getJdbcUrl(), - "--table=" + TABLE_NAME, - "--username=" + postgres.getUsername(), - "--password=" + postgres.getPassword() - }); + PipelineResult.State state = + PostgresWrite.main( + new String[] { + "--runner=DirectRunner", + "--jdbcUrl=" + postgres.getJdbcUrl(), + "--table=" + TABLE_NAME, + "--username=" + postgres.getUsername(), + "--password=" + postgres.getPassword() + }); assertEquals(PipelineResult.State.DONE, state); verifyDatabaseContent(); } private void verifyDatabaseContent() throws Exception { - try (Connection conn = DriverManager.getConnection( - postgres.getJdbcUrl(), - postgres.getUsername(), - postgres.getPassword())) { + try (Connection conn = + DriverManager.getConnection( + postgres.getJdbcUrl(), postgres.getUsername(), postgres.getPassword())) { Statement stmt = conn.createStatement(); - ResultSet rs = stmt.executeQuery(String.format("SELECT id, name FROM %s ORDER BY id", TABLE_NAME)); + ResultSet rs = + stmt.executeQuery(String.format("SELECT id, name FROM %s ORDER BY id", TABLE_NAME)); List results = new ArrayList<>(); while (rs.next()) { @@ -91,4 +93,4 @@ private void verifyDatabaseContent() throws Exception { assertTrue("Should contain Jane Smith", results.contains("2,Jane Smith")); } } -} \ No newline at end of file +} From 57d2b5a26cd2d16732dd3e795f10c372c20e7baf Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 2 Mar 2026 23:32:24 -0500 Subject: [PATCH 5/7] Minor change on table names. --- .../src/test/java/com/example/dataflow/PostgresReadIT.java | 5 ++--- .../src/test/java/com/example/dataflow/PostgresWriteIT.java | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/dataflow/snippets/src/test/java/com/example/dataflow/PostgresReadIT.java b/dataflow/snippets/src/test/java/com/example/dataflow/PostgresReadIT.java index a58457d083c..bb8edb9dd5a 100644 --- a/dataflow/snippets/src/test/java/com/example/dataflow/PostgresReadIT.java +++ b/dataflow/snippets/src/test/java/com/example/dataflow/PostgresReadIT.java @@ -34,7 +34,7 @@ public class PostgresReadIT { - private static final String TABLE_NAME = "test_table"; + private static final String TABLE_NAME = "test_read_table"; private static final String OUTPUT_PATH = "test-output"; // The TextIO connector appends this suffix to the pipeline output file. private static final String OUTPUT_FILE_SUFFIX = "-00000-of-00001.txt"; @@ -56,8 +56,7 @@ public void setUp() throws Exception { stmt.execute( String.format("CREATE TABLE %s (id INT PRIMARY KEY, name VARCHAR(255))", TABLE_NAME)); stmt.execute(String.format("INSERT INTO %s (id, name) VALUES (1, 'John Doe')", TABLE_NAME)); - stmt.execute( - String.format("INSERT INTO test_table (id, name) VALUES (2, 'Jane Smith')", TABLE_NAME)); + stmt.execute(String.format("INSERT INTO %s (id, name) VALUES (2, 'Jane Smith')", TABLE_NAME)); } } diff --git a/dataflow/snippets/src/test/java/com/example/dataflow/PostgresWriteIT.java b/dataflow/snippets/src/test/java/com/example/dataflow/PostgresWriteIT.java index 86e297a9154..c6b05bf7653 100644 --- a/dataflow/snippets/src/test/java/com/example/dataflow/PostgresWriteIT.java +++ b/dataflow/snippets/src/test/java/com/example/dataflow/PostgresWriteIT.java @@ -33,7 +33,7 @@ public class PostgresWriteIT { - private static final String TABLE_NAME = "test_table"; + private static final String TABLE_NAME = "test_write_table"; private static final PostgreSQLContainer postgres = new PostgreSQLContainer<>("postgres:15-alpine"); From be51dc122f911a704efe28530023f9b2b420e39f Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 3 Mar 2026 11:00:13 -0500 Subject: [PATCH 6/7] Fix iceberg test due to beam version upgrade. --- dataflow/snippets/pom.xml | 13 ++++++++++--- .../java/com/example/dataflow/ApacheIcebergIT.java | 4 ++-- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/dataflow/snippets/pom.xml b/dataflow/snippets/pom.xml index 2d18b6bb777..1dc8e77bf78 100644 --- a/dataflow/snippets/pom.xml +++ b/dataflow/snippets/pom.xml @@ -37,10 +37,10 @@ 11 11 UTF-8 - 2.69.0 + 2.70.0 2.0.12 - 1.14.0 - 1.4.2 + 1.16.0 + 1.10.0 @@ -65,6 +65,13 @@ + + + org.apache.avro + avro + + 1.12.0 + com.google.cloud libraries-bom diff --git a/dataflow/snippets/src/test/java/com/example/dataflow/ApacheIcebergIT.java b/dataflow/snippets/src/test/java/com/example/dataflow/ApacheIcebergIT.java index 432d7455b5a..10a06cc64b7 100644 --- a/dataflow/snippets/src/test/java/com/example/dataflow/ApacheIcebergIT.java +++ b/dataflow/snippets/src/test/java/com/example/dataflow/ApacheIcebergIT.java @@ -98,7 +98,7 @@ private void writeTableRecord(Table table) FileAppender appender = Parquet.write(HadoopOutputFile.fromPath(path, hadoopConf)) - .createWriterFunc(GenericParquetWriter::buildWriter) + .createWriterFunc(msgType -> GenericParquetWriter.create(table.schema(), msgType)) .schema(table.schema()) .overwrite() .build(); @@ -166,7 +166,7 @@ public void tearDown() throws IOException, ExecutionException, InterruptedExcept RemoteStorageHelper.forceDelete(storage, bucketName, 1, TimeUnit.MINUTES); } } - + @Test public void testApacheIcebergRestCatalog() throws IOException, InterruptedException { String warehouse = "gs://" + bucketName; From e21d4d67e9d55f290c43e2b8429bb88811decc81 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 3 Mar 2026 11:06:21 -0500 Subject: [PATCH 7/7] Address reviews. --- dataflow/snippets/pom.xml | 7 +++++-- .../src/main/java/com/example/dataflow/PostgresRead.java | 7 +------ 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/dataflow/snippets/pom.xml b/dataflow/snippets/pom.xml index 1dc8e77bf78..9b5db567aa9 100644 --- a/dataflow/snippets/pom.xml +++ b/dataflow/snippets/pom.xml @@ -41,6 +41,8 @@ 2.0.12 1.16.0 1.10.0 + 42.7.3 + 1.20.0 @@ -203,13 +205,14 @@ org.postgresql postgresql - 42.7.3 + ${postgresql.version} + test org.testcontainers postgresql - 1.20.0 + ${testcontainers.version} test diff --git a/dataflow/snippets/src/main/java/com/example/dataflow/PostgresRead.java b/dataflow/snippets/src/main/java/com/example/dataflow/PostgresRead.java index 96593fe46e1..58be27132a8 100644 --- a/dataflow/snippets/src/main/java/com/example/dataflow/PostgresRead.java +++ b/dataflow/snippets/src/main/java/com/example/dataflow/PostgresRead.java @@ -88,12 +88,7 @@ public static Pipeline createPipeline(Options options) { // Convert each row to a string. .apply( MapElements.into(TypeDescriptors.strings()) - .via( - (row -> { - var id = row.getInt32("id"); - var name = row.getString("name"); - return String.format("%d,%s", id, name); - }))) + .via((row -> String.format("%d,%s", row.getInt32("id"), row.getString("name"))))) // Write strings to a text file. .apply(TextIO.write().to(options.getOutputPath()).withSuffix(".txt").withNumShards(1)); return pipeline;