Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 38 additions & 3 deletions dataflow/snippets/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<apache_beam.version>2.67.0</apache_beam.version>
<apache_beam.version>2.70.0</apache_beam.version>
<slf4j.version>2.0.12</slf4j.version>
<parquet.version>1.14.0</parquet.version>
<iceberg.version>1.4.2</iceberg.version>
<parquet.version>1.16.0</parquet.version>
<iceberg.version>1.10.0</iceberg.version>
<postgresql.version>42.7.3</postgresql.version>
<testcontainers.version>1.20.0</testcontainers.version>
</properties>

<build>
Expand All @@ -65,6 +67,13 @@

<dependencyManagement>
<dependencies>
<!-- Force Avro version for everything. -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<!-- `LogicalTypes.TimestampNanos` was introduced at 1.12.0. -->
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
Expand Down Expand Up @@ -160,6 +169,11 @@
<artifactId>iceberg-gcp</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-parquet</artifactId>
<version>${iceberg.version}</version>
</dependency>

<!-- Kafka -->
<dependency>
Expand All @@ -181,6 +195,27 @@
<scope>test</scope>
</dependency>

<!-- Postgres -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-jdbc</artifactId>
<version>${apache_beam.version}</version>
</dependency>
<dependency>
<!-- For preparing and verifying a test table in the integration test -->
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgresql.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<!-- For running containerized Postgres instance in the integration test -->
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>

<!-- Google Cloud -->
<dependency>
<groupId>com.google.cloud</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.example.dataflow;

// [START dataflow_postgres_read]
import com.google.common.collect.ImmutableMap;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class PostgresRead {

public interface Options extends PipelineOptions {
@Description("The jdbc url of PostgreSQL database to read from.")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"The jdbc url" -> "The JDBC URL"

String getJdbcUrl();

void setJdbcUrl(String value);

@Description("The table of PostgresSQL to read from.")
String getTable();

void setTable(String value);

@Description("The username of PostgreSQL database.")
String getUsername();

void setUsername(String value);

@Description("The password of PostgreSQL database.")
String getPassword();

void setPassword(String value);

@Description("Path to write the output file")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add period at end to match other strings. Also is this a local path, GCS, other? Might be worth being explicit about that unless it's clear / widely known.

String getOutputPath();

void setOutputPath(String value);
}

public static PipelineResult.State main(String[] args) {
// Parse the pipeline options passed into the application. Example:
// --runner=DirectRunner --jdbcUrl=$JDBC_URL --table=$TABLE
// --username=$USERNAME --password=$PASSWORD --outputPath=$OUTPUT_FILE
// For more information, see
// https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = createPipeline(options);
return pipeline.run().waitUntilFinish();
}

public static Pipeline createPipeline(Options options) {

// Create configuration parameters for the Managed I/O transform.
ImmutableMap<String, Object> config =
ImmutableMap.<String, Object>builder()
.put("jdbc_url", options.getJdbcUrl())
.put("location", options.getTable())
.put("username", options.getUsername())
.put("password", options.getPassword())
.build();

// Build the pipeline.
var pipeline = Pipeline.create(options);
pipeline
// Read data from Postgres database via managed io.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Options, depending on which makes most sense:
Read data from a Postgres database using Managed I/O.
Read data from a Postgres database using managed IO.

.apply(Managed.read(Managed.POSTGRES).withConfig(config))
.getSinglePCollection()
// Convert each row to a string.
.apply(
MapElements.into(TypeDescriptors.strings())
.via((row -> String.format("%d,%s", row.getInt32("id"), row.getString("name")))))
// Write strings to a text file.
.apply(TextIO.write().to(options.getOutputPath()).withSuffix(".txt").withNumShards(1));
return pipeline;
}
}
// [END dataflow_postgres_read]
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.example.dataflow;

// [START dataflow_postgres_write]
import static org.apache.beam.sdk.schemas.Schema.toSchema;

import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.Row;

public class PostgresWrite {

private static Schema INPUT_SCHEMA =
Stream.of(
Schema.Field.of("id", Schema.FieldType.INT32),
Schema.Field.of("name", Schema.FieldType.STRING))
.collect(toSchema());

private static List<Row> ROWS =
Arrays.asList(
Row.withSchema(INPUT_SCHEMA)
.withFieldValue("id", 1)
.withFieldValue("name", "John Doe")
.build(),
Row.withSchema(INPUT_SCHEMA)
.withFieldValue("id", 2)
.withFieldValue("name", "Jane Smith")
.build());

public interface Options extends PipelineOptions {
@Description("The jdbc url of PostgreSQL database to write to.")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JDBC URL

String getJdbcUrl();

void setJdbcUrl(String value);

@Description("The table of PostgresSQL to write to.")
String getTable();

void setTable(String value);

@Description("The username of PostgreSQL database.")
String getUsername();

void setUsername(String value);

@Description("The password of PostgreSQL database.")
String getPassword();

void setPassword(String value);
}

public static PipelineResult.State main(String[] args) {
// Parse the pipeline options passed into the application. Example:
// --runner=DirectRunner --jdbcUrl=$JDBC_URL --table=$TABLE
// --username=$USERNAME --password=$PASSWORD
// For more information, see
// https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = createPipeline(options);
return pipeline.run().waitUntilFinish();
}

public static Pipeline createPipeline(Options options) {

// Create configuration parameters for the Managed I/O transform.
ImmutableMap<String, Object> config =
ImmutableMap.<String, Object>builder()
.put("jdbc_url", options.getJdbcUrl())
.put("location", options.getTable())
.put("username", options.getUsername())
.put("password", options.getPassword())
.build();

// Build the pipeline.
var pipeline = Pipeline.create(options);
pipeline
// Create data to write to Postgres.
.apply(Create.of(ROWS))
.setRowSchema(INPUT_SCHEMA)
// Write rows to Postgres database via managed io.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as other file

.apply(Managed.write(Managed.POSTGRES).withConfig(config))
.getSinglePCollection();
return pipeline;
}
}
// [END dataflow_postgres_write]
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private void writeTableRecord(Table table)

FileAppender<Record> appender =
Parquet.write(HadoopOutputFile.fromPath(path, hadoopConf))
.createWriterFunc(GenericParquetWriter::buildWriter)
.createWriterFunc(msgType -> GenericParquetWriter.create(table.schema(), msgType))
.schema(table.schema())
.overwrite()
.build();
Expand Down Expand Up @@ -166,7 +166,7 @@ public void tearDown() throws IOException, ExecutionException, InterruptedExcept
RemoteStorageHelper.forceDelete(storage, bucketName, 1, TimeUnit.MINUTES);
}
}

@Test
public void testApacheIcebergRestCatalog() throws IOException, InterruptedException {
String warehouse = "gs://" + bucketName;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.example.dataflow;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import org.apache.beam.sdk.PipelineResult;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.testcontainers.containers.PostgreSQLContainer;

public class PostgresReadIT {

private static final String TABLE_NAME = "test_read_table";
private static final String OUTPUT_PATH = "test-output";
// The TextIO connector appends this suffix to the pipeline output file.
private static final String OUTPUT_FILE_SUFFIX = "-00000-of-00001.txt";
private static final String OUTPUT_FILE_NAME = OUTPUT_PATH + OUTPUT_FILE_SUFFIX;

private static final PostgreSQLContainer<?> postgres =
new PostgreSQLContainer<>("postgres:15-alpine");

@Before
public void setUp() throws Exception {
postgres.start();

// Initialize the database with table and data
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add period at end

try (Connection conn =
DriverManager.getConnection(
postgres.getJdbcUrl(), postgres.getUsername(), postgres.getPassword())) {

Statement stmt = conn.createStatement();
stmt.execute(
String.format("CREATE TABLE %s (id INT PRIMARY KEY, name VARCHAR(255))", TABLE_NAME));
stmt.execute(String.format("INSERT INTO %s (id, name) VALUES (1, 'John Doe')", TABLE_NAME));
stmt.execute(String.format("INSERT INTO %s (id, name) VALUES (2, 'Jane Smith')", TABLE_NAME));
}
}

@After
public void tearDown() throws IOException {
if (postgres != null) {
postgres.stop();
}
Files.deleteIfExists(Paths.get(OUTPUT_FILE_NAME));
}

@Test
public void testPostgresRead() throws IOException {
// Execute the Beam pipeline
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

period at end

PipelineResult.State state =
PostgresRead.main(
new String[] {
"--runner=DirectRunner",
"--jdbcUrl=" + postgres.getJdbcUrl(),
"--table=" + TABLE_NAME,
"--username=" + postgres.getUsername(),
"--password=" + postgres.getPassword(),
"--outputPath=" + OUTPUT_PATH
});

assertEquals(PipelineResult.State.DONE, state);
verifyOutput();
}

private void verifyOutput() throws IOException {
File outputFile = new File(OUTPUT_FILE_NAME);
assertTrue("Output file should exist", outputFile.exists());

String content = Files.readString(Paths.get(OUTPUT_FILE_NAME));

assertTrue(content.contains("1,John Doe"));
assertTrue(content.contains("2,Jane Smith"));
}
}
Loading