iterator4 = result4.collect()) {
+ long deadline4 = System.currentTimeMillis() + Duration.ofSeconds(90).toMillis();
+ while (!(foundOrd001 && foundOrd002) && System.currentTimeMillis() < deadline4) {
+ if (iterator4.hasNext()) {
+ Row row = iterator4.next();
+ Object orderIdField = row.getField(0);
+ if (orderIdField == null) {
+ LOG.warn(" Skipping row with null order_id: {}", row);
+ continue;
+ }
+ String orderId = orderIdField.toString();
+ LOG.info(" Row: {}", row);
+
+ if ("ORD-001".equals(orderId)) {
+ foundOrd001 = true;
+ // Verify nested customer
+ Row customer = (Row) row.getField(3);
+ if (!"John Doe".equals(customer.getField(0))) {
+ LOG.error("FAIL: customer.name mismatch");
+ System.exit(1);
+ }
+ // Verify notes is not null
+ if (row.getField(7) == null) {
+ LOG.error("FAIL: ORD-001 notes should not be null");
+ System.exit(1);
+ }
+ LOG.info(" ORD-001 verified: nested ROW, ARRAY, MAP, DECIMAL, TIMESTAMP OK");
+ }
+
+ if ("ORD-002".equals(orderId)) {
+ foundOrd002 = true;
+ // Verify nullable notes is null
+ if (row.getField(7) != null) {
+ LOG.error("FAIL: ORD-002 notes should be null, got: {}", row.getField(7));
+ System.exit(1);
+ }
+ LOG.info(" ORD-002 verified: nullable field correctly null");
+ }
+ } else {
+ Thread.sleep(500);
+ }
+ }
+ }
+
+ if (foundOrd001 && foundOrd002) {
+ LOG.info("PASS test 4: complex types round-trip works (nested ROW, ARRAY, MAP, DECIMAL, TIMESTAMP, nullable)");
+ } else {
+ LOG.error("FAIL test 4: did not find ORD-001 and ORD-002");
+ System.exit(1);
+ }
+
+ // ================================================================
+ // TEST 5: Schema compatibility — BACKWARD
+ //
+ // BACKWARD compatibility means new schema can read data written
+ // with the old schema. Removing a REQUIRED field is NOT allowed
+ // because a new reader would be missing a field that old data has.
+ //
+ // Strategy: use NOT NULL fields so Flink generates required Avro
+ // fields (plain "string" instead of ["null", "string"] union).
+ // 1. v1: all NOT NULL fields → required Avro fields
+ // 2. v2: add a nullable field → backward-compatible
+ // 3. v3: remove a NOT NULL field → should FAIL (BACKWARD violation)
+ //
+ // IMPORTANT: delete the 'flink-avro-glue-e2e-compat-backward'
+ // schema from GSR before re-running this test.
+ // ================================================================
+ LOG.info("=== Test 5: Schema compatibility — BACKWARD ===");
+ String backwardSchemaName = "flink-avro-glue-e2e-compat-backward";
+
+ // Step 1: v1 schema — all fields NOT NULL (required in Avro)
+ tEnv.executeSql(
+ "CREATE TABLE compat_bw_sink_v1 ("
+ + " user_name STRING NOT NULL,"
+ + " age INT NOT NULL,"
+ + " city STRING NOT NULL"
+ + ") WITH ("
+ + " 'connector' = 'kinesis',"
+ + " 'stream.arn' = '" + streamArn + "',"
+ + " 'aws.region' = '" + awsRegion + "',"
+ + " 'format' = 'avro-glue',"
+ + " 'avro-glue.aws.region' = '" + awsRegion + "',"
+ + " 'avro-glue.registry.name' = '" + registryName + "',"
+ + " 'avro-glue.schema.name' = '" + backwardSchemaName + "',"
+ + " 'avro-glue.schema.autoRegistration' = 'true',"
+ + " 'avro-glue.schema.compatibility' = 'BACKWARD'"
+ + ")");
+
+ LOG.info("Test 5 step 1: Writing v1 data (3 NOT NULL fields)...");
+ tEnv.executeSql(
+ "INSERT INTO compat_bw_sink_v1 VALUES ('Alice', 30, 'Seattle')")
+ .await(120, TimeUnit.SECONDS);
+ LOG.info("Test 5 step 1: v1 write succeeded (schema registered with BACKWARD compat)");
+
+ // Step 2: v2 schema — add a nullable field (backward-compatible)
+ tEnv.executeSql(
+ "CREATE TABLE compat_bw_sink_v2 ("
+ + " user_name STRING NOT NULL,"
+ + " age INT NOT NULL,"
+ + " city STRING NOT NULL,"
+ + " email STRING"
+ + ") WITH ("
+ + " 'connector' = 'kinesis',"
+ + " 'stream.arn' = '" + streamArn + "',"
+ + " 'aws.region' = '" + awsRegion + "',"
+ + " 'format' = 'avro-glue',"
+ + " 'avro-glue.aws.region' = '" + awsRegion + "',"
+ + " 'avro-glue.registry.name' = '" + registryName + "',"
+ + " 'avro-glue.schema.name' = '" + backwardSchemaName + "',"
+ + " 'avro-glue.schema.autoRegistration' = 'true',"
+ + " 'avro-glue.schema.compatibility' = 'BACKWARD'"
+ + ")");
+
+ LOG.info("Test 5 step 2: Writing v2 data (added nullable email)...");
+ tEnv.executeSql(
+ "INSERT INTO compat_bw_sink_v2 VALUES ('Bob', 25, 'Portland', 'bob@example.com')")
+ .await(120, TimeUnit.SECONDS);
+ LOG.info("Test 5 step 2: v2 write succeeded (adding nullable field is backward-compatible)");
+
+ // Step 3: v3 schema — remove required 'city' field
+ // This is a true BACKWARD violation: old data has a required 'city'
+ // field, but the new reader schema doesn't have it. The new reader
+ // cannot read old data that contains a required field it doesn't know.
+ tEnv.executeSql(
+ "CREATE TABLE compat_bw_sink_v3 ("
+ + " user_name STRING NOT NULL,"
+ + " age INT NOT NULL"
+ + ") WITH ("
+ + " 'connector' = 'kinesis',"
+ + " 'stream.arn' = '" + streamArn + "',"
+ + " 'aws.region' = '" + awsRegion + "',"
+ + " 'format' = 'avro-glue',"
+ + " 'avro-glue.aws.region' = '" + awsRegion + "',"
+ + " 'avro-glue.registry.name' = '" + registryName + "',"
+ + " 'avro-glue.schema.name' = '" + backwardSchemaName + "',"
+ + " 'avro-glue.schema.autoRegistration' = 'true',"
+ + " 'avro-glue.schema.compatibility' = 'BACKWARD'"
+ + ")");
+
+ LOG.info("Test 5 step 3: Writing v3 data (removed required 'city' — BACKWARD violation)...");
+ try {
+ tEnv.executeSql(
+ "INSERT INTO compat_bw_sink_v3 VALUES ('Charlie', 35)")
+ .await(120, TimeUnit.SECONDS);
+ LOG.error("FAIL test 5 step 3: expected schema compatibility rejection but write succeeded");
+ System.exit(1);
+ } catch (Exception e) {
+ LOG.info("Test 5 step 3: Write correctly rejected by GSR: {}", e.getMessage());
+ LOG.info("PASS test 5: BACKWARD compatibility enforced correctly");
+ }
+
+ // ================================================================
+ // TEST 6: Schema compatibility — NONE (no validation)
+ //
+ // With NONE, any schema evolution is allowed. We can freely
+ // add/remove fields without GSR rejecting.
+ // ================================================================
+ LOG.info("=== Test 6: Schema compatibility — NONE ===");
+ String noneSchemaName = "flink-avro-glue-e2e-compat-none";
+
+ // v1: 3 fields
+ tEnv.executeSql(
+ "CREATE TABLE compat_none_sink_v1 ("
+ + " user_name STRING,"
+ + " age INT,"
+ + " city STRING"
+ + ") WITH ("
+ + " 'connector' = 'kinesis',"
+ + " 'stream.arn' = '" + streamArn + "',"
+ + " 'aws.region' = '" + awsRegion + "',"
+ + " 'format' = 'avro-glue',"
+ + " 'avro-glue.aws.region' = '" + awsRegion + "',"
+ + " 'avro-glue.registry.name' = '" + registryName + "',"
+ + " 'avro-glue.schema.name' = '" + noneSchemaName + "',"
+ + " 'avro-glue.schema.autoRegistration' = 'true',"
+ + " 'avro-glue.schema.compatibility' = 'NONE'"
+ + ")");
+
+ LOG.info("Test 6 step 1: Writing v1 data with NONE compat...");
+ tEnv.executeSql(
+ "INSERT INTO compat_none_sink_v1 VALUES ('Dave', 40, 'Denver')")
+ .await(120, TimeUnit.SECONDS);
+
+ // v2: completely different schema (2 fields, removed city)
+ tEnv.executeSql(
+ "CREATE TABLE compat_none_sink_v2 ("
+ + " user_name STRING,"
+ + " age INT"
+ + ") WITH ("
+ + " 'connector' = 'kinesis',"
+ + " 'stream.arn' = '" + streamArn + "',"
+ + " 'aws.region' = '" + awsRegion + "',"
+ + " 'format' = 'avro-glue',"
+ + " 'avro-glue.aws.region' = '" + awsRegion + "',"
+ + " 'avro-glue.registry.name' = '" + registryName + "',"
+ + " 'avro-glue.schema.name' = '" + noneSchemaName + "',"
+ + " 'avro-glue.schema.autoRegistration' = 'true',"
+ + " 'avro-glue.schema.compatibility' = 'NONE'"
+ + ")");
+
+ LOG.info("Test 6 step 2: Writing v2 data (incompatible change, should succeed with NONE)...");
+ tEnv.executeSql(
+ "INSERT INTO compat_none_sink_v2 VALUES ('Eve', 28)")
+ .await(120, TimeUnit.SECONDS);
+ LOG.info("PASS test 6: NONE compatibility allows any schema evolution");
+
+ // ================================================================
+ // TEST 7: Schema compatibility — FULL
+ //
+ // FULL = both BACKWARD and FORWARD. Only changes that are
+ // compatible in both directions are allowed. Adding an optional
+ // field with a default is the canonical safe evolution.
+ // ================================================================
+ LOG.info("=== Test 7: Schema compatibility — FULL ===");
+ String fullSchemaName = "flink-avro-glue-e2e-compat-full";
+
+ // v1: 3 fields
+ tEnv.executeSql(
+ "CREATE TABLE compat_full_sink_v1 ("
+ + " user_name STRING,"
+ + " age INT,"
+ + " city STRING"
+ + ") WITH ("
+ + " 'connector' = 'kinesis',"
+ + " 'stream.arn' = '" + streamArn + "',"
+ + " 'aws.region' = '" + awsRegion + "',"
+ + " 'format' = 'avro-glue',"
+ + " 'avro-glue.aws.region' = '" + awsRegion + "',"
+ + " 'avro-glue.registry.name' = '" + registryName + "',"
+ + " 'avro-glue.schema.name' = '" + fullSchemaName + "',"
+ + " 'avro-glue.schema.autoRegistration' = 'true',"
+ + " 'avro-glue.schema.compatibility' = 'FULL'"
+ + ")");
+
+ LOG.info("Test 7 step 1: Writing v1 data with FULL compat...");
+ tEnv.executeSql(
+ "INSERT INTO compat_full_sink_v1 VALUES ('Frank', 45, 'Chicago')")
+ .await(120, TimeUnit.SECONDS);
+
+ // v2: add optional field (should succeed under FULL — adding optional is safe both ways)
+ tEnv.executeSql(
+ "CREATE TABLE compat_full_sink_v2 ("
+ + " user_name STRING,"
+ + " age INT,"
+ + " city STRING,"
+ + " email STRING"
+ + ") WITH ("
+ + " 'connector' = 'kinesis',"
+ + " 'stream.arn' = '" + streamArn + "',"
+ + " 'aws.region' = '" + awsRegion + "',"
+ + " 'format' = 'avro-glue',"
+ + " 'avro-glue.aws.region' = '" + awsRegion + "',"
+ + " 'avro-glue.registry.name' = '" + registryName + "',"
+ + " 'avro-glue.schema.name' = '" + fullSchemaName + "',"
+ + " 'avro-glue.schema.autoRegistration' = 'true',"
+ + " 'avro-glue.schema.compatibility' = 'FULL'"
+ + ")");
+
+ LOG.info("Test 7 step 2: Writing v2 data (added optional email)...");
+ tEnv.executeSql(
+ "INSERT INTO compat_full_sink_v2 VALUES ('Grace', 32, 'Boston', 'grace@example.com')")
+ .await(120, TimeUnit.SECONDS);
+ LOG.info("PASS test 7: FULL compatibility allows adding optional fields");
+
+ LOG.info("=== All E2E Tests Passed ===");
+ }
+
+ private static String requireEnv(String name) {
+ String value = System.getenv(name);
+ if (value == null || value.isEmpty()) {
+ System.err.println("ERROR: environment variable " + name + " is required");
+ System.exit(1);
+ }
+ return value;
+ }
+}
diff --git a/flink-connector-aws-e2e-tests/flink-formats-avro-glue-schema-registry-e2e-tests/src/main/resources/log4j2.properties b/flink-connector-aws-e2e-tests/flink-formats-avro-glue-schema-registry-e2e-tests/src/main/resources/log4j2.properties
new file mode 100644
index 00000000..cac90c92
--- /dev/null
+++ b/flink-connector-aws-e2e-tests/flink-formats-avro-glue-schema-registry-e2e-tests/src/main/resources/log4j2.properties
@@ -0,0 +1,41 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = INFO
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+appender.console.name = ConsoleAppender
+appender.console.type = Console
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n
+
+# Reduce noise from Flink internals
+logger.flink.name = org.apache.flink
+logger.flink.level = WARN
+
+# Our test class at INFO
+logger.e2e.name = org.apache.flink.glue.schema.registry.test
+logger.e2e.level = INFO
+
+# GSR format classes at INFO (for debugging)
+logger.gsrformat.name = org.apache.flink.formats.avro.glue.schema.registry
+logger.gsrformat.level = INFO
+
+# GSR SDK
+logger.gsr.name = software.amazon.awssdk
+logger.gsr.level = WARN
diff --git a/flink-connector-aws-e2e-tests/flink-formats-avro-glue-schema-registry-e2e-tests/src/test/resources/aws-e2e-setup.sh b/flink-connector-aws-e2e-tests/flink-formats-avro-glue-schema-registry-e2e-tests/src/test/resources/aws-e2e-setup.sh
new file mode 100755
index 00000000..f3ed9fa3
--- /dev/null
+++ b/flink-connector-aws-e2e-tests/flink-formats-avro-glue-schema-registry-e2e-tests/src/test/resources/aws-e2e-setup.sh
@@ -0,0 +1,110 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Setup and teardown AWS resources for avro-glue E2E tests.
+#
+# Usage:
+# ./aws-e2e-setup.sh setup # Create Kinesis stream + GSR registry
+# ./aws-e2e-setup.sh teardown # Delete Kinesis stream + GSR registry
+#
+# Environment variables (set before running):
+# AWS_REGION - AWS region (default: us-east-1)
+# KINESIS_STREAM - Kinesis stream name (default: flink-avro-glue-e2e-test)
+# GSR_REGISTRY_NAME - GSR registry name (default: flink-avro-glue-e2e-registry)
+# GSR_SCHEMA_NAME - Schema name prefix (default: flink-avro-glue-e2e-schema)
+
+set -euo pipefail
+
+AWS_REGION="${AWS_REGION:-us-east-1}"
+KINESIS_STREAM="${KINESIS_STREAM:-flink-avro-glue-e2e-test}"
+GSR_REGISTRY_NAME="${GSR_REGISTRY_NAME:-flink-avro-glue-e2e-registry}"
+GSR_SCHEMA_NAME="${GSR_SCHEMA_NAME:-flink-avro-glue-e2e-schema}"
+
+setup() {
+ echo "=== Creating Kinesis stream: ${KINESIS_STREAM} ==="
+ aws kinesis create-stream \
+ --stream-name "${KINESIS_STREAM}" \
+ --shard-count 1 \
+ --region "${AWS_REGION}" 2>/dev/null || echo "Stream may already exist"
+
+ echo "Waiting for stream to become ACTIVE..."
+ aws kinesis wait stream-exists \
+ --stream-name "${KINESIS_STREAM}" \
+ --region "${AWS_REGION}"
+
+ STREAM_ARN=$(aws kinesis describe-stream-summary \
+ --stream-name "${KINESIS_STREAM}" \
+ --region "${AWS_REGION}" \
+ --query 'StreamDescriptionSummary.StreamARN' \
+ --output text)
+
+ echo "=== Creating GSR registry: ${GSR_REGISTRY_NAME} ==="
+ aws glue create-registry \
+ --registry-name "${GSR_REGISTRY_NAME}" \
+ --region "${AWS_REGION}" 2>/dev/null || echo "Registry may already exist"
+
+ echo ""
+ echo "=== Setup complete ==="
+ echo "Export these before running the E2E test:"
+ echo " export AWS_REGION=${AWS_REGION}"
+ echo " export KINESIS_STREAM_ARN=${STREAM_ARN}"
+ echo " export GSR_REGISTRY_NAME=${GSR_REGISTRY_NAME}"
+ echo " export GSR_SCHEMA_NAME=${GSR_SCHEMA_NAME}"
+}
+
+teardown() {
+ echo "=== Deleting Kinesis stream: ${KINESIS_STREAM} ==="
+ aws kinesis delete-stream \
+ --stream-name "${KINESIS_STREAM}" \
+ --enforce-consumer-deletion \
+ --region "${AWS_REGION}" 2>/dev/null || echo "Stream may not exist"
+
+ echo "=== Deleting GSR schemas in registry: ${GSR_REGISTRY_NAME} ==="
+ # Delete all schemas in the registry before deleting the registry
+ SCHEMAS=$(aws glue list-schemas \
+ --registry-id RegistryName="${GSR_REGISTRY_NAME}" \
+ --region "${AWS_REGION}" \
+ --query 'Schemas[].SchemaName' \
+ --output text 2>/dev/null || echo "")
+
+ for schema in ${SCHEMAS}; do
+ echo " Deleting schema: ${schema}"
+ aws glue delete-schema \
+ --schema-id SchemaName="${schema}",RegistryName="${GSR_REGISTRY_NAME}" \
+ --region "${AWS_REGION}" 2>/dev/null || true
+ done
+
+ echo "=== Deleting GSR registry: ${GSR_REGISTRY_NAME} ==="
+ aws glue delete-registry \
+ --registry-id RegistryName="${GSR_REGISTRY_NAME}" \
+ --region "${AWS_REGION}" 2>/dev/null || echo "Registry may not exist"
+
+ echo "=== Teardown complete ==="
+}
+
+case "${1:-}" in
+ setup)
+ setup
+ ;;
+ teardown)
+ teardown
+ ;;
+ *)
+ echo "Usage: $0 {setup|teardown}"
+ exit 1
+ ;;
+esac
diff --git a/flink-connector-aws-e2e-tests/flink-formats-avro-glue-schema-registry-e2e-tests/src/test/resources/log4j2-test.properties b/flink-connector-aws-e2e-tests/flink-formats-avro-glue-schema-registry-e2e-tests/src/test/resources/log4j2-test.properties
index 835c2ec9..e0c3649c 100644
--- a/flink-connector-aws-e2e-tests/flink-formats-avro-glue-schema-registry-e2e-tests/src/test/resources/log4j2-test.properties
+++ b/flink-connector-aws-e2e-tests/flink-formats-avro-glue-schema-registry-e2e-tests/src/test/resources/log4j2-test.properties
@@ -26,3 +26,13 @@ appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
+
+# GSR format classes at INFO (for debugging schema fetcher)
+logger.gsrformat.name = org.apache.flink.formats.avro.glue.schema.registry
+logger.gsrformat.level = INFO
+logger.gsrformat.appenderRef.test.ref = TestLogger
+
+# E2E test class
+logger.e2e.name = org.apache.flink.glue.schema.registry.test
+logger.e2e.level = INFO
+logger.e2e.appenderRef.test.ref = TestLogger
diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/pom.xml b/flink-formats-aws/flink-avro-glue-schema-registry/pom.xml
index 4bfe4088..cd96ee0a 100644
--- a/flink-formats-aws/flink-avro-glue-schema-registry/pom.xml
+++ b/flink-formats-aws/flink-avro-glue-schema-registry/pom.xml
@@ -39,6 +39,18 @@ under the License.
${flink.version}
provided
+
+ org.apache.flink
+ flink-table-api-java
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-table-common
+ ${flink.version}
+ provided
+
org.apache.flink
flink-avro
@@ -61,12 +73,48 @@ under the License.
${glue.schema.registry.version}
-
+
+
+ org.apache.flink
+ flink-table-runtime
+ ${flink.version}
+ test
+
+
+ org.apache.flink
+ flink-table-api-java
+ ${flink.version}
+ test
+ test-jar
+
+
+ org.apache.flink
+ flink-table-common
+ ${flink.version}
+ test
+ test-jar
+
+
+ org.apache.flink
+ flink-avro
+ ${flink.version}
+ test
+ test-jar
+
+
org.apache.flink
flink-architecture-tests-test
test
+
+
+
+ net.jqwik
+ jqwik
+ 1.8.2
+ test
+
diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/AvroGlueFormatOptions.java b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/AvroGlueFormatOptions.java
new file mode 100644
index 00000000..f889f803
--- /dev/null
+++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/AvroGlueFormatOptions.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.glue.schema.registry;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
+
+/**
+ * Avro-specific configuration options for the AWS Glue Schema Registry Avro format factory.
+ *
+ * Shared options (aws.region, registry.name, schema.name, etc.) are defined in {@link
+ * GlueFormatOptions}.
+ */
+@PublicEvolving
+public class AvroGlueFormatOptions extends GlueFormatOptions {
+
+ public static final ConfigOption SCHEMA_TYPE =
+ ConfigOptions.key("schema.type")
+ .enumType(AvroRecordType.class)
+ .defaultValue(AvroRecordType.GENERIC_RECORD)
+ .withDescription("Avro record type. Defaults to GENERIC_RECORD.");
+
+ public static final ConfigOption AVRO_NAMESPACE =
+ ConfigOptions.key("avro.namespace")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Override the namespace in the auto-generated Avro schema. "
+ + "Use this to match schemas already registered in GSR with a different namespace.");
+
+ public static final ConfigOption AVRO_RECORD_NAME =
+ ConfigOptions.key("avro.record-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Override the record name in the auto-generated Avro schema. "
+ + "Use this to match schemas already registered in GSR with a different record name.");
+
+ public static final ConfigOption SCHEMA_FETCH_FROM_REGISTRY =
+ ConfigOptions.key("schema.fetchFromRegistry")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to fetch the schema from GSR instead of using the auto-generated one. "
+ + "Defaults to false.");
+
+ private AvroGlueFormatOptions() {}
+}
diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/AvroSchemaPatcher.java b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/AvroSchemaPatcher.java
new file mode 100644
index 00000000..65a23086
--- /dev/null
+++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/AvroSchemaPatcher.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.glue.schema.registry;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.avro.Schema;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Utility class that creates a new Avro {@link Schema} with overridden namespace and/or record name
+ * while preserving all field definitions from the original schema.
+ *
+ * This addresses the Avro schema namespace bug where Flink's {@code AvroSchemaConverter}
+ * auto-generates a namespace (e.g. {@code org.apache.flink.avro.generated}) that may differ from
+ * schemas already registered in AWS Glue Schema Registry.
+ *
+ *
The patcher recursively patches all nested record types to use the same namespace, ensuring
+ * consistent namespace usage throughout the schema hierarchy.
+ */
+@Internal
+public class AvroSchemaPatcher {
+
+ /**
+ * Creates a new Avro Schema with overridden namespace and/or record name, preserving all field
+ * definitions from the original schema. Recursively patches all nested record types to use the
+ * same namespace.
+ *
+ * @param original the original Avro schema (can be a RECORD or UNION type)
+ * @param namespace the namespace override, or {@code null} to keep the original namespace
+ * @param recordName the record name override, or {@code null} to keep the original record name
+ * @return a new Schema with the overridden namespace/name and the same fields as the original
+ */
+ public static Schema patchSchema(
+ Schema original, @Nullable String namespace, @Nullable String recordName) {
+ if (namespace == null && recordName == null) {
+ return original;
+ }
+
+ // Track already-patched schemas to handle recursive references
+ Map patchedSchemas = new HashMap<>();
+
+ // Special handling for top-level UNION: apply recordName to the main RECORD in the union
+ if (original.getType() == Schema.Type.UNION) {
+ return patchTopLevelUnion(original, namespace, recordName, patchedSchemas);
+ }
+
+ return patchSchemaRecursive(original, namespace, recordName, patchedSchemas);
+ }
+
+ /**
+ * Patches a top-level UNION schema, applying the recordName override to the main RECORD type.
+ * This handles the common case where AvroSchemaConverter produces ["null", record] unions.
+ */
+ private static Schema patchTopLevelUnion(
+ Schema unionSchema,
+ @Nullable String namespace,
+ @Nullable String recordName,
+ Map patchedSchemas) {
+
+ List patchedTypes = new ArrayList<>();
+ boolean recordNameApplied = false;
+
+ for (Schema unionType : unionSchema.getTypes()) {
+ if (unionType.getType() == Schema.Type.RECORD && !recordNameApplied) {
+ // Apply recordName to the first RECORD in the union
+ patchedTypes.add(
+ patchSchemaRecursive(unionType, namespace, recordName, patchedSchemas));
+ recordNameApplied = true;
+ } else {
+ // For other types (null, primitives, nested records), only apply namespace
+ patchedTypes.add(
+ patchSchemaRecursive(unionType, namespace, null, patchedSchemas));
+ }
+ }
+ return Schema.createUnion(patchedTypes);
+ }
+
+ private static Schema patchSchemaRecursive(
+ Schema schema,
+ @Nullable String namespace,
+ @Nullable String recordName,
+ Map patchedSchemas) {
+
+ switch (schema.getType()) {
+ case RECORD:
+ return patchRecordSchema(schema, namespace, recordName, patchedSchemas);
+
+ case ARRAY:
+ Schema patchedElement =
+ patchSchemaRecursive(
+ schema.getElementType(), namespace, null, patchedSchemas);
+ return Schema.createArray(patchedElement);
+
+ case MAP:
+ Schema patchedValue =
+ patchSchemaRecursive(
+ schema.getValueType(), namespace, null, patchedSchemas);
+ return Schema.createMap(patchedValue);
+
+ case UNION:
+ List patchedTypes = new ArrayList<>();
+ for (Schema unionType : schema.getTypes()) {
+ patchedTypes.add(
+ patchSchemaRecursive(unionType, namespace, null, patchedSchemas));
+ }
+ return Schema.createUnion(patchedTypes);
+
+ default:
+ // Primitive types and other types don't need patching
+ return schema;
+ }
+ }
+
+ private static Schema patchRecordSchema(
+ Schema original,
+ @Nullable String namespace,
+ @Nullable String recordName,
+ Map patchedSchemas) {
+
+ String originalFullName = original.getFullName();
+
+ // Check if we've already patched this schema (handles recursive references)
+ if (patchedSchemas.containsKey(originalFullName)) {
+ return patchedSchemas.get(originalFullName);
+ }
+
+ String effectiveNamespace = namespace != null ? namespace : original.getNamespace();
+ String effectiveName = recordName != null ? recordName : original.getName();
+
+ // Create the new schema first (without fields) to handle recursive references
+ Schema patched =
+ Schema.createRecord(
+ effectiveName,
+ original.getDoc(),
+ effectiveNamespace,
+ original.isError());
+
+ // Register before processing fields to handle self-references
+ patchedSchemas.put(originalFullName, patched);
+
+ // Now patch all fields recursively
+ List patchedFields =
+ original.getFields().stream()
+ .map(
+ f -> {
+ Schema patchedFieldSchema =
+ patchSchemaRecursive(
+ f.schema(), namespace, null, patchedSchemas);
+ return new Schema.Field(
+ f.name(),
+ patchedFieldSchema,
+ f.doc(),
+ f.defaultVal());
+ })
+ .collect(Collectors.toList());
+
+ patched.setFields(patchedFields);
+ return patched;
+ }
+
+ private AvroSchemaPatcher() {}
+}
diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/AvroSchemaResolver.java b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/AvroSchemaResolver.java
new file mode 100644
index 00000000..4dbbc180
--- /dev/null
+++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/AvroSchemaResolver.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.glue.schema.registry;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+
+import org.apache.avro.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+/**
+ * Resolves the Avro schema to use for GSR serialization/deserialization based on the configured
+ * options.
+ *
+ * Resolution flow:
+ *
+ *
+ * - If {@code schema.fetchFromRegistry} is true, attempt to fetch the schema from GSR
+ *
- If fetch fails or is disabled, check for {@code avro.namespace} / {@code avro.record-name}
+ * overrides and patch the auto-generated schema
+ *
- Otherwise, use the auto-generated schema unchanged
+ *
+ */
+@Internal
+public class AvroSchemaResolver {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AvroSchemaResolver.class);
+
+ /**
+ * Resolves the Avro schema based on the format options and the auto-generated schema.
+ *
+ * @param autoGeneratedSchema the Avro schema auto-generated from the Flink RowType via {@code
+ * AvroSchemaConverter.convertToSchema(RowType)}
+ * @param formatOptions the Flink format options from SQL DDL
+ * @param schemaFetcher optional callback to fetch schema from GSR; may be null if fetch is not
+ * supported
+ * @return the resolved Avro schema
+ */
+ public static Schema resolveSchema(
+ Schema autoGeneratedSchema,
+ ReadableConfig formatOptions,
+ @Nullable SchemaFetcher schemaFetcher) {
+
+ boolean fetchFromRegistry =
+ formatOptions.get(AvroGlueFormatOptions.SCHEMA_FETCH_FROM_REGISTRY);
+
+ if (fetchFromRegistry && schemaFetcher != null) {
+ String schemaName = formatOptions.get(GlueFormatOptions.SCHEMA_NAME);
+ String registryName = formatOptions.get(GlueFormatOptions.REGISTRY_NAME);
+ try {
+ Schema fetched = schemaFetcher.fetchSchema(registryName, schemaName);
+ if (fetched != null) {
+ return fetched;
+ }
+ } catch (Exception e) {
+ LOG.warn(
+ "Failed to fetch schema '{}' from registry '{}', "
+ + "falling back to auto-generated schema.",
+ schemaName,
+ registryName,
+ e);
+ }
+ }
+
+ // Apply namespace/record-name patching if configured
+ String namespace =
+ formatOptions.getOptional(AvroGlueFormatOptions.AVRO_NAMESPACE).orElse(null);
+ String recordName =
+ formatOptions.getOptional(AvroGlueFormatOptions.AVRO_RECORD_NAME).orElse(null);
+
+ return AvroSchemaPatcher.patchSchema(autoGeneratedSchema, namespace, recordName);
+ }
+
+ /**
+ * Functional interface for fetching a schema from GSR. This allows the format factory to inject
+ * the actual GSR client call without coupling this resolver to the GSR SDK directly.
+ */
+ @FunctionalInterface
+ public interface SchemaFetcher {
+ /**
+ * Fetches the latest schema version from GSR.
+ *
+ * @param registryName the registry name
+ * @param schemaName the schema name
+ * @return the fetched Avro Schema, or null if not found
+ * @throws Exception if the fetch fails
+ */
+ @Nullable
+ Schema fetchSchema(String registryName, String schemaName) throws Exception;
+ }
+
+ private AvroSchemaResolver() {}
+}
diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueFormatConfigBuilder.java b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueFormatConfigBuilder.java
new file mode 100644
index 00000000..6a3e6861
--- /dev/null
+++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueFormatConfigBuilder.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.glue.schema.registry;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+
+import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Utility class that builds the {@code Map} configuration required by the AWS Glue
+ * Schema Registry SDK from Flink's {@link ReadableConfig} format options.
+ */
+@Internal
+public class GlueFormatConfigBuilder {
+
+ /**
+ * Builds a GSR SDK configuration map from the shared {@link GlueFormatOptions}.
+ *
+ * @param formatOptions the Flink format options from SQL DDL
+ * @return a map of GSR SDK configuration keys to their values
+ */
+ public static Map buildConfigMap(ReadableConfig formatOptions) {
+ final Map properties = new HashMap<>();
+
+ formatOptions
+ .getOptional(GlueFormatOptions.AWS_REGION)
+ .ifPresent(v -> properties.put(AWSSchemaRegistryConstants.AWS_REGION, v));
+ formatOptions
+ .getOptional(GlueFormatOptions.AWS_ENDPOINT)
+ .ifPresent(v -> properties.put(AWSSchemaRegistryConstants.AWS_ENDPOINT, v));
+ formatOptions
+ .getOptional(GlueFormatOptions.REGISTRY_NAME)
+ .ifPresent(v -> properties.put(AWSSchemaRegistryConstants.REGISTRY_NAME, v));
+ formatOptions
+ .getOptional(GlueFormatOptions.SCHEMA_NAME)
+ .ifPresent(v -> properties.put(AWSSchemaRegistryConstants.SCHEMA_NAME, v));
+ formatOptions
+ .getOptional(GlueFormatOptions.CACHE_SIZE)
+ .ifPresent(v -> properties.put(AWSSchemaRegistryConstants.CACHE_SIZE, v));
+ formatOptions
+ .getOptional(GlueFormatOptions.CACHE_TTL_MS)
+ .ifPresent(
+ v ->
+ properties.put(
+ AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, v));
+ formatOptions
+ .getOptional(GlueFormatOptions.SCHEMA_AUTO_REGISTRATION)
+ .ifPresent(
+ v ->
+ properties.put(
+ AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING,
+ v));
+ formatOptions
+ .getOptional(GlueFormatOptions.SCHEMA_COMPATIBILITY)
+ .ifPresent(
+ v -> properties.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, v));
+ formatOptions
+ .getOptional(GlueFormatOptions.SCHEMA_COMPRESSION)
+ .ifPresent(v -> properties.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, v));
+
+ return properties;
+ }
+
+ private GlueFormatConfigBuilder() {}
+}
diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueFormatOptions.java b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueFormatOptions.java
new file mode 100644
index 00000000..8703e177
--- /dev/null
+++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueFormatOptions.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.glue.schema.registry;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
+import software.amazon.awssdk.services.glue.model.Compatibility;
+
+import java.time.Duration;
+
+/**
+ * Base configuration options shared across all AWS Glue Schema Registry format factories (Avro,
+ * JSON, Protobuf).
+ */
+@PublicEvolving
+public class GlueFormatOptions {
+
+ public static final ConfigOption AWS_REGION =
+ ConfigOptions.key("aws.region")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("AWS region for the Glue Schema Registry.");
+
+ public static final ConfigOption AWS_ENDPOINT =
+ ConfigOptions.key("aws.endpoint")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Custom AWS endpoint URL.");
+
+ public static final ConfigOption REGISTRY_NAME =
+ ConfigOptions.key("registry.name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Name of the Glue Schema Registry.");
+
+ public static final ConfigOption SCHEMA_NAME =
+ ConfigOptions.key("schema.name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Schema name under which to register/look up the schema in Glue Schema Registry.");
+
+ public static final ConfigOption CACHE_SIZE =
+ ConfigOptions.key("cache.size")
+ .intType()
+ .defaultValue(200)
+ .withDescription("Maximum number of items in the schema cache. Defaults to 200.");
+
+ public static final ConfigOption CACHE_TTL_MS =
+ ConfigOptions.key("cache.ttlMs")
+ .longType()
+ .defaultValue(Duration.ofDays(1L).toMillis())
+ .withDescription("Cache TTL in milliseconds. Defaults to 1 day.");
+
+ public static final ConfigOption SCHEMA_AUTO_REGISTRATION =
+ ConfigOptions.key("schema.autoRegistration")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to auto-register schemas with Glue Schema Registry. Defaults to false.");
+
+ public static final ConfigOption SCHEMA_COMPATIBILITY =
+ ConfigOptions.key("schema.compatibility")
+ .enumType(Compatibility.class)
+ .defaultValue(AWSSchemaRegistryConstants.DEFAULT_COMPATIBILITY_SETTING)
+ .withDescription("Schema compatibility mode for Glue Schema Registry.");
+
+ public static final ConfigOption SCHEMA_COMPRESSION =
+ ConfigOptions.key("schema.compression")
+ .enumType(AWSSchemaRegistryConstants.COMPRESSION.class)
+ .defaultValue(AWSSchemaRegistryConstants.COMPRESSION.NONE)
+ .withDescription("Compression type for schema data. Defaults to NONE.");
+
+ protected GlueFormatOptions() {}
+}
diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactory.java b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactory.java
new file mode 100644
index 00000000..fe1870e5
--- /dev/null
+++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactory.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.glue.schema.registry;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema;
+import org.apache.flink.formats.avro.AvroRowDataSerializationSchema;
+import org.apache.flink.formats.avro.AvroToRowDataConverters;
+import org.apache.flink.formats.avro.RowDataToAvroConverters;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.avro.Schema;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Table format factory for providing configured instances of AWS Glue Schema Registry Avro to
+ * RowData {@link SerializationSchema} and {@link DeserializationSchema}.
+ *
+ * This factory supports:
+ *
+ *
+ * - SPI discovery via identifier {@code avro-glue}
+ *
- Projection pushdown via {@link ProjectableDecodingFormat}
+ *
- Schema namespace/record-name patching via {@link AvroSchemaPatcher}
+ *
- Schema fetching from GSR via {@link AvroSchemaResolver}
+ *
+ */
+@Internal
+public class GlueSchemaRegistryAvroFormatFactory
+ implements DeserializationFormatFactory, SerializationFormatFactory {
+
+ public static final String IDENTIFIER = "avro-glue";
+
+ @Override
+ public DecodingFormat> createDecodingFormat(
+ DynamicTableFactory.Context context, ReadableConfig formatOptions) {
+ FactoryUtil.validateFactoryOptions(this, formatOptions);
+
+ return new ProjectableDecodingFormat>() {
+ @Override
+ public DeserializationSchema createRuntimeDecoder(
+ DynamicTableSource.Context context,
+ DataType producedDataType,
+ int[][] projections) {
+ producedDataType = Projection.of(projections).project(producedDataType);
+ final RowType rowType = (RowType) producedDataType.getLogicalType();
+ final TypeInformation rowDataTypeInfo =
+ context.createTypeInformation(producedDataType);
+ final Schema autoGeneratedSchema =
+ AvroSchemaConverter.convertToSchema(rowType);
+ final Map configMap =
+ GlueFormatConfigBuilder.buildConfigMap(formatOptions);
+ final AvroSchemaResolver.SchemaFetcher schemaFetcher =
+ createSchemaFetcherIfEnabled(formatOptions, configMap);
+ final Schema resolvedSchema =
+ AvroSchemaResolver.resolveSchema(
+ autoGeneratedSchema, formatOptions, schemaFetcher);
+ return new AvroRowDataDeserializationSchema(
+ GlueSchemaRegistryAvroDeserializationSchema.forGeneric(
+ resolvedSchema, configMap),
+ AvroToRowDataConverters.createRowConverter(rowType),
+ rowDataTypeInfo);
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.insertOnly();
+ }
+ };
+ }
+
+ @Override
+ public EncodingFormat> createEncodingFormat(
+ DynamicTableFactory.Context context, ReadableConfig formatOptions) {
+ FactoryUtil.validateFactoryOptions(this, formatOptions);
+
+ return new EncodingFormat>() {
+ @Override
+ public SerializationSchema createRuntimeEncoder(
+ DynamicTableSink.Context context, DataType consumedDataType) {
+ final RowType rowType = (RowType) consumedDataType.getLogicalType();
+ final Schema autoGeneratedSchema =
+ AvroSchemaConverter.convertToSchema(rowType);
+ final Map configMap =
+ GlueFormatConfigBuilder.buildConfigMap(formatOptions);
+ final AvroSchemaResolver.SchemaFetcher schemaFetcher =
+ createSchemaFetcherIfEnabled(formatOptions, configMap);
+ final Schema resolvedSchema =
+ AvroSchemaResolver.resolveSchema(
+ autoGeneratedSchema, formatOptions, schemaFetcher);
+ final String transportName = formatOptions.get(GlueFormatOptions.SCHEMA_NAME);
+ return new AvroRowDataSerializationSchema(
+ rowType,
+ GlueSchemaRegistryAvroSerializationSchema.forGeneric(
+ resolvedSchema, transportName, configMap),
+ RowDataToAvroConverters.createConverter(rowType));
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.insertOnly();
+ }
+ };
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set> requiredOptions() {
+ Set> options = new HashSet<>();
+ options.add(GlueFormatOptions.AWS_REGION);
+ options.add(GlueFormatOptions.REGISTRY_NAME);
+ options.add(GlueFormatOptions.SCHEMA_NAME);
+ return options;
+ }
+
+ @Override
+ public Set> optionalOptions() {
+ Set> options = new HashSet<>();
+ options.add(GlueFormatOptions.AWS_ENDPOINT);
+ options.add(GlueFormatOptions.CACHE_SIZE);
+ options.add(GlueFormatOptions.CACHE_TTL_MS);
+ options.add(GlueFormatOptions.SCHEMA_AUTO_REGISTRATION);
+ options.add(GlueFormatOptions.SCHEMA_COMPATIBILITY);
+ options.add(GlueFormatOptions.SCHEMA_COMPRESSION);
+ options.add(AvroGlueFormatOptions.SCHEMA_TYPE);
+ options.add(AvroGlueFormatOptions.AVRO_NAMESPACE);
+ options.add(AvroGlueFormatOptions.AVRO_RECORD_NAME);
+ options.add(AvroGlueFormatOptions.SCHEMA_FETCH_FROM_REGISTRY);
+ return options;
+ }
+
+ @Override
+ public Set> forwardOptions() {
+ return Stream.of(
+ GlueFormatOptions.AWS_REGION,
+ GlueFormatOptions.AWS_ENDPOINT,
+ GlueFormatOptions.REGISTRY_NAME,
+ GlueFormatOptions.SCHEMA_NAME,
+ GlueFormatOptions.CACHE_SIZE,
+ GlueFormatOptions.CACHE_TTL_MS,
+ GlueFormatOptions.SCHEMA_AUTO_REGISTRATION,
+ GlueFormatOptions.SCHEMA_COMPATIBILITY,
+ GlueFormatOptions.SCHEMA_COMPRESSION,
+ AvroGlueFormatOptions.SCHEMA_TYPE,
+ AvroGlueFormatOptions.AVRO_NAMESPACE,
+ AvroGlueFormatOptions.AVRO_RECORD_NAME,
+ AvroGlueFormatOptions.SCHEMA_FETCH_FROM_REGISTRY)
+ .collect(Collectors.toSet());
+ }
+
+ /**
+ * Creates a SchemaFetcher if fetchFromRegistry is enabled.
+ *
+ * @param formatOptions the format options
+ * @param configMap the GSR config map
+ * @return a SchemaFetcher instance, or null if fetchFromRegistry is disabled
+ */
+ private static AvroSchemaResolver.SchemaFetcher createSchemaFetcherIfEnabled(
+ ReadableConfig formatOptions, Map configMap) {
+ boolean fetchFromRegistry =
+ formatOptions.get(AvroGlueFormatOptions.SCHEMA_FETCH_FROM_REGISTRY);
+ if (fetchFromRegistry) {
+ return new GlueSchemaRegistrySchemaFetcher(configMap);
+ }
+ return null;
+ }
+}
diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistrySchemaFetcher.java b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistrySchemaFetcher.java
new file mode 100644
index 00000000..2dacfe38
--- /dev/null
+++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistrySchemaFetcher.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.glue.schema.registry;
+
+import org.apache.flink.annotation.Internal;
+
+import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
+import org.apache.avro.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.GlueClientBuilder;
+import software.amazon.awssdk.services.glue.model.GetSchemaVersionRequest;
+import software.amazon.awssdk.services.glue.model.GetSchemaVersionResponse;
+import software.amazon.awssdk.services.glue.model.SchemaId;
+import software.amazon.awssdk.services.glue.model.SchemaVersionNumber;
+
+import java.net.URI;
+import java.util.Map;
+
+/**
+ * Fetches the latest Avro schema from AWS Glue Schema Registry using the Glue SDK.
+ *
+ * Used when {@code schema.fetchFromRegistry=true} to resolve the actual schema from GSR instead
+ * of relying on the auto-generated Flink schema (which defaults to namespace {@code
+ * org.apache.flink.avro.generated} and record name {@code record}).
+ */
+@Internal
+public class GlueSchemaRegistrySchemaFetcher implements AvroSchemaResolver.SchemaFetcher {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(GlueSchemaRegistrySchemaFetcher.class);
+
+ private final GlueClient glueClient;
+
+ public GlueSchemaRegistrySchemaFetcher(Map configMap) {
+ String region = (String) configMap.get(AWSSchemaRegistryConstants.AWS_REGION);
+ GlueClientBuilder builder = GlueClient.builder();
+ if (region != null) {
+ builder.region(Region.of(region));
+ }
+ Object endpoint = configMap.get(AWSSchemaRegistryConstants.AWS_ENDPOINT);
+ if (endpoint != null) {
+ builder.endpointOverride(URI.create(endpoint.toString()));
+ }
+ this.glueClient = builder.build();
+ LOG.debug("GlueSchemaRegistrySchemaFetcher initialized for region: {}", region);
+ }
+
+ /** Package-private constructor for testing with a pre-built GlueClient. */
+ GlueSchemaRegistrySchemaFetcher(GlueClient glueClient) {
+ this.glueClient = glueClient;
+ }
+
+ @Override
+ public Schema fetchSchema(String registryName, String schemaName) throws Exception {
+ LOG.debug(
+ "Fetching schema from GSR - registry: '{}', schema: '{}'",
+ registryName,
+ schemaName);
+
+ GetSchemaVersionRequest request =
+ GetSchemaVersionRequest.builder()
+ .schemaId(
+ SchemaId.builder()
+ .registryName(registryName)
+ .schemaName(schemaName)
+ .build())
+ .schemaVersionNumber(
+ SchemaVersionNumber.builder().latestVersion(true).build())
+ .build();
+
+ GetSchemaVersionResponse response = glueClient.getSchemaVersion(request);
+ String schemaDefinition = response.schemaDefinition();
+
+ if (schemaDefinition == null || schemaDefinition.isEmpty()) {
+ LOG.warn(
+ "Schema definition is null or empty for registry: '{}', schema: '{}'",
+ registryName,
+ schemaName);
+ return null;
+ }
+
+ Schema schema = new Schema.Parser().parse(schemaDefinition);
+ LOG.debug(
+ "Fetched schema - namespace: '{}', name: '{}', fields: {}",
+ schema.getNamespace(),
+ schema.getName(),
+ schema.getFields().size());
+
+ return schema;
+ }
+}
diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 00000000..2b14a190
--- /dev/null
+++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroFormatFactory
diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/AvroRoundTripIntegrationTest.java b/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/AvroRoundTripIntegrationTest.java
new file mode 100644
index 00000000..4138e1b3
--- /dev/null
+++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/AvroRoundTripIntegrationTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.glue.schema.registry;
+
+import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema;
+import org.apache.flink.formats.avro.AvroRowDataSerializationSchema;
+import org.apache.flink.formats.avro.AvroToRowDataConverters;
+import org.apache.flink.formats.avro.RowDataToAvroConverters;
+import org.apache.flink.formats.avro.SchemaCoder;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
+import org.apache.avro.Schema;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration tests for Avro round-trip serialization/deserialization with mock GSR facades.
+ *
+ * Validates Requirements 8.1, 8.3.
+ */
+class AvroRoundTripIntegrationTest {
+
+ private MockGlueSchemaRegistryFacades mockFacades;
+ private Map configs;
+
+ @BeforeEach
+ void setUp() {
+ mockFacades = new MockGlueSchemaRegistryFacades();
+ configs = new HashMap<>();
+ configs.put(AWSSchemaRegistryConstants.AWS_REGION, "us-west-2");
+ configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
+ configs.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "test-schema");
+ }
+
+ /**
+ * Tests basic Avro round-trip: RowData → serialize → deserialize → RowData.
+ * Requirement 8.1.
+ */
+ @Test
+ void testBasicAvroRoundTrip() throws Exception {
+ RowType rowType =
+ new RowType(
+ false,
+ Arrays.asList(
+ new RowType.RowField(
+ "name", new VarCharType(VarCharType.MAX_LENGTH)),
+ new RowType.RowField("age", new IntType())));
+
+ Schema avroSchema = AvroSchemaConverter.convertToSchema(rowType);
+
+ // Build mock SchemaCoder using mock facades
+ GlueSchemaRegistryOutputStreamSerializer mockSerializer =
+ mockFacades.createMockOutputStreamSerializer("test-topic", configs);
+ GlueSchemaRegistryInputStreamDeserializer mockDeserializer =
+ mockFacades.createMockInputStreamDeserializer();
+
+ SchemaCoder serCoder = new GlueSchemaRegistryAvroSchemaCoder(mockSerializer);
+ SchemaCoder deserCoder = new GlueSchemaRegistryAvroSchemaCoder(mockDeserializer);
+
+ // Create ser/deser schemas
+ GlueSchemaRegistryAvroSerializationSchema
+ gsrAvroSer =
+ new GlueSchemaRegistryAvroSerializationSchema<>(
+ org.apache.avro.generic.GenericRecord.class,
+ avroSchema,
+ serCoder);
+
+ AvroRowDataSerializationSchema serSchema =
+ new AvroRowDataSerializationSchema(
+ rowType,
+ gsrAvroSer,
+ RowDataToAvroConverters.createConverter(rowType));
+
+ AvroRowDataDeserializationSchema deserSchema =
+ new AvroRowDataDeserializationSchema(
+ createDeserSchemaWithMockCoder(avroSchema, deserCoder),
+ AvroToRowDataConverters.createRowConverter(rowType),
+ InternalTypeInfo.of(rowType));
+
+ // Open schemas
+ serSchema.open(null);
+ deserSchema.open(null);
+
+ // Create test RowData
+ GenericRowData original = new GenericRowData(2);
+ original.setField(0, StringData.fromString("Alice"));
+ original.setField(1, 30);
+
+ // Serialize
+ byte[] serialized = serSchema.serialize(original);
+ assertThat(serialized).isNotNull();
+ assertThat(serialized.length).isGreaterThan(MockGlueSchemaRegistryFacades.GSR_HEADER_SIZE);
+
+ // Deserialize
+ RowData deserialized = deserSchema.deserialize(serialized);
+ assertThat(deserialized).isNotNull();
+ assertThat(deserialized.getString(0).toString()).isEqualTo("Alice");
+ assertThat(deserialized.getInt(1)).isEqualTo(30);
+ }
+
+ /**
+ * Tests namespace bug scenario: serialize with avro.namespace override.
+ * Pre-register schema with custom namespace, then serialize with patched schema.
+ * Requirement 8.3.
+ */
+ @Test
+ void testNamespaceBugScenario() throws Exception {
+ RowType rowType =
+ new RowType(
+ false,
+ Arrays.asList(
+ new RowType.RowField(
+ "name", new VarCharType(VarCharType.MAX_LENGTH)),
+ new RowType.RowField("age", new IntType())));
+
+ // Auto-generated schema has namespace "org.apache.flink.avro.generated"
+ Schema autoGenerated = AvroSchemaConverter.convertToSchema(rowType);
+ assertThat(autoGenerated.getNamespace()).isEqualTo("org.apache.flink.avro.generated");
+
+ // Patch schema with custom namespace (simulating avro.namespace option)
+ String customNamespace = "com.example.myapp";
+ Schema patchedSchema = AvroSchemaPatcher.patchSchema(autoGenerated, customNamespace, null);
+ assertThat(patchedSchema.getNamespace()).isEqualTo(customNamespace);
+ assertThat(patchedSchema.getFields()).hasSameSizeAs(autoGenerated.getFields());
+
+ // Build mock SchemaCoder with patched schema
+ GlueSchemaRegistryOutputStreamSerializer mockSerializer =
+ mockFacades.createMockOutputStreamSerializer("test-topic", configs);
+ GlueSchemaRegistryInputStreamDeserializer mockDeserializer =
+ mockFacades.createMockInputStreamDeserializer();
+
+ SchemaCoder serCoder = new GlueSchemaRegistryAvroSchemaCoder(mockSerializer);
+ SchemaCoder deserCoder = new GlueSchemaRegistryAvroSchemaCoder(mockDeserializer);
+
+ // Create ser schema with patched schema
+ GlueSchemaRegistryAvroSerializationSchema
+ gsrAvroSer =
+ new GlueSchemaRegistryAvroSerializationSchema<>(
+ org.apache.avro.generic.GenericRecord.class,
+ patchedSchema,
+ serCoder);
+
+ AvroRowDataSerializationSchema serSchema =
+ new AvroRowDataSerializationSchema(
+ rowType,
+ gsrAvroSer,
+ RowDataToAvroConverters.createConverter(rowType));
+
+ // Create deser schema with patched schema
+ AvroRowDataDeserializationSchema deserSchema =
+ new AvroRowDataDeserializationSchema(
+ createDeserSchemaWithMockCoder(patchedSchema, deserCoder),
+ AvroToRowDataConverters.createRowConverter(rowType),
+ InternalTypeInfo.of(rowType));
+
+ serSchema.open(null);
+ deserSchema.open(null);
+
+ // Create test RowData
+ GenericRowData original = new GenericRowData(2);
+ original.setField(0, StringData.fromString("Bob"));
+ original.setField(1, 25);
+
+ // Serialize with patched schema
+ byte[] serialized = serSchema.serialize(original);
+ assertThat(serialized).isNotNull();
+
+ // Deserialize — should succeed despite different namespace
+ RowData deserialized = deserSchema.deserialize(serialized);
+ assertThat(deserialized).isNotNull();
+ assertThat(deserialized.getString(0).toString()).isEqualTo("Bob");
+ assertThat(deserialized.getInt(1)).isEqualTo(25);
+ }
+
+ /**
+ * Creates a GlueSchemaRegistryAvroDeserializationSchema with a mock SchemaCoder
+ * injected via reflection (the schemaCoder field is private in the parent class).
+ */
+ private static GlueSchemaRegistryAvroDeserializationSchema
+ createDeserSchemaWithMockCoder(Schema schema, SchemaCoder mockCoder) {
+ // Create a real deser schema (configs won't be used since we override schemaCoder)
+ Map dummyConfigs = new HashMap<>();
+ dummyConfigs.put(AWSSchemaRegistryConstants.AWS_REGION, "us-west-2");
+ GlueSchemaRegistryAvroDeserializationSchema
+ deserSchema =
+ GlueSchemaRegistryAvroDeserializationSchema.forGeneric(
+ schema, dummyConfigs);
+
+ // Use reflection to inject the mock SchemaCoder
+ try {
+ Class> clazz = deserSchema.getClass();
+ while (clazz != null) {
+ try {
+ Field field = clazz.getDeclaredField("schemaCoder");
+ field.setAccessible(true);
+ field.set(deserSchema, mockCoder);
+ return deserSchema;
+ } catch (NoSuchFieldException e) {
+ clazz = clazz.getSuperclass();
+ }
+ }
+ throw new RuntimeException("Could not find schemaCoder field");
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException("Failed to inject mock SchemaCoder", e);
+ }
+ }
+}
diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/AvroRoundTripPropertyTest.java b/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/AvroRoundTripPropertyTest.java
new file mode 100644
index 00000000..62d49a50
--- /dev/null
+++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/AvroRoundTripPropertyTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.glue.schema.registry;
+
+import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema;
+import org.apache.flink.formats.avro.AvroRowDataSerializationSchema;
+import org.apache.flink.formats.avro.AvroToRowDataConverters;
+import org.apache.flink.formats.avro.RowDataToAvroConverters;
+import org.apache.flink.formats.avro.SchemaCoder;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
+import net.jqwik.api.Arbitraries;
+import net.jqwik.api.Arbitrary;
+import net.jqwik.api.ForAll;
+import net.jqwik.api.Property;
+import net.jqwik.api.Provide;
+import net.jqwik.api.Tag;
+import org.apache.avro.Schema;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Property-based tests for Avro serialization round-trip with mock GSR facades.
+ *
+ * Property 3: Avro serialization round-trip
+ *
+ *
Validates: Requirements 1.4, 1.5, 8.1
+ */
+@Tag("Feature: gsr-flink-sql-formats, Property 3: Avro serialization round-trip")
+class AvroRoundTripPropertyTest {
+
+ /**
+ * For any valid RowData matching a given RowType, serializing via the Avro encoding format
+ * (with mock GSR facades) and then deserializing should produce equivalent RowData.
+ */
+ @Property(tries = 100)
+ void avroRoundTripPreservesData(@ForAll("rowDataWithType") RowDataWithType input)
+ throws Exception {
+ RowType rowType = input.rowType;
+ RowData original = input.rowData;
+
+ Schema avroSchema = AvroSchemaConverter.convertToSchema(rowType);
+
+ // Create mock facades
+ MockGlueSchemaRegistryFacades mockFacades = new MockGlueSchemaRegistryFacades();
+ Map configs = new HashMap<>();
+ configs.put(AWSSchemaRegistryConstants.AWS_REGION, "us-west-2");
+ configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
+ configs.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "test-schema");
+
+ GlueSchemaRegistryOutputStreamSerializer mockSerializer =
+ mockFacades.createMockOutputStreamSerializer("test-topic", configs);
+ GlueSchemaRegistryInputStreamDeserializer mockDeserializer =
+ mockFacades.createMockInputStreamDeserializer();
+
+ SchemaCoder serCoder = new GlueSchemaRegistryAvroSchemaCoder(mockSerializer);
+ SchemaCoder deserCoder = new GlueSchemaRegistryAvroSchemaCoder(mockDeserializer);
+
+ // Create serialization schema
+ GlueSchemaRegistryAvroSerializationSchema
+ gsrAvroSer =
+ new GlueSchemaRegistryAvroSerializationSchema<>(
+ org.apache.avro.generic.GenericRecord.class,
+ avroSchema,
+ serCoder);
+
+ AvroRowDataSerializationSchema serSchema =
+ new AvroRowDataSerializationSchema(
+ rowType,
+ gsrAvroSer,
+ RowDataToAvroConverters.createConverter(rowType));
+
+ // Create deserialization schema with mock coder via reflection
+ GlueSchemaRegistryAvroDeserializationSchema
+ gsrAvroDe =
+ GlueSchemaRegistryAvroDeserializationSchema.forGeneric(
+ avroSchema, configs);
+ injectSchemaCoder(gsrAvroDe, deserCoder);
+
+ AvroRowDataDeserializationSchema deserSchema =
+ new AvroRowDataDeserializationSchema(
+ gsrAvroDe,
+ AvroToRowDataConverters.createRowConverter(rowType),
+ InternalTypeInfo.of(rowType));
+
+ serSchema.open(null);
+ deserSchema.open(null);
+
+ // Serialize
+ byte[] serialized = serSchema.serialize(original);
+ assertThat(serialized).isNotNull();
+
+ // Deserialize
+ RowData deserialized = deserSchema.deserialize(serialized);
+ assertThat(deserialized).isNotNull();
+
+ // Verify equivalence field by field
+ assertRowDataEquals(original, deserialized, rowType);
+ }
+
+ /** Injects a mock SchemaCoder into a deser schema via reflection. */
+ private static void injectSchemaCoder(Object target, SchemaCoder coder) {
+ try {
+ Class> clazz = target.getClass();
+ while (clazz != null) {
+ try {
+ Field field = clazz.getDeclaredField("schemaCoder");
+ field.setAccessible(true);
+ field.set(target, coder);
+ return;
+ } catch (NoSuchFieldException e) {
+ clazz = clazz.getSuperclass();
+ }
+ }
+ throw new RuntimeException("Could not find schemaCoder field");
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException("Failed to inject mock SchemaCoder", e);
+ }
+ }
+
+ /** Compares two RowData instances field by field based on the RowType. */
+ private void assertRowDataEquals(RowData expected, RowData actual, RowType rowType) {
+ assertThat(actual.getArity()).isEqualTo(expected.getArity());
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ LogicalType fieldType = rowType.getTypeAt(i);
+ // Avro treats null strings as null, null ints/bools/doubles as null
+ if (expected.isNullAt(i)) {
+ assertThat(actual.isNullAt(i)).isTrue();
+ continue;
+ }
+ if (fieldType instanceof VarCharType) {
+ assertThat(actual.getString(i).toString())
+ .isEqualTo(expected.getString(i).toString());
+ } else if (fieldType instanceof IntType) {
+ assertThat(actual.getInt(i)).isEqualTo(expected.getInt(i));
+ } else if (fieldType instanceof BooleanType) {
+ assertThat(actual.getBoolean(i)).isEqualTo(expected.getBoolean(i));
+ } else if (fieldType instanceof DoubleType) {
+ assertThat(actual.getDouble(i)).isEqualTo(expected.getDouble(i));
+ }
+ }
+ }
+
+ // --- Generators ---
+
+ @Provide
+ Arbitrary rowDataWithType() {
+ // Fixed schema with STRING, INT, BOOLEAN, DOUBLE fields
+ RowType rowType =
+ new RowType(
+ false,
+ Arrays.asList(
+ new RowType.RowField(
+ "name", new VarCharType(VarCharType.MAX_LENGTH)),
+ new RowType.RowField("age", new IntType()),
+ new RowType.RowField("active", new BooleanType()),
+ new RowType.RowField("score", new DoubleType())));
+
+ return Arbitraries.of(rowType)
+ .flatMap(rt -> generateRowData(rt).map(rd -> new RowDataWithType(rt, rd)));
+ }
+
+ private Arbitrary generateRowData(RowType rowType) {
+ Arbitrary strings =
+ Arbitraries.strings().alpha().ofMinLength(0).ofMaxLength(50);
+ Arbitrary ints = Arbitraries.integers().between(-10000, 10000);
+ Arbitrary bools = Arbitraries.of(true, false);
+ Arbitrary doubles =
+ Arbitraries.doubles().between(-1e6, 1e6).ofScale(4);
+
+ return strings.flatMap(
+ name ->
+ ints.flatMap(
+ age ->
+ bools.flatMap(
+ active ->
+ doubles.map(
+ score -> {
+ GenericRowData row =
+ new GenericRowData(4);
+ row.setField(
+ 0,
+ StringData.fromString(
+ name));
+ row.setField(1, age);
+ row.setField(2, active);
+ row.setField(3, score);
+ return (RowData) row;
+ }))));
+ }
+
+ /** Holder for a RowData and its corresponding RowType. */
+ static class RowDataWithType {
+ final RowType rowType;
+ final RowData rowData;
+
+ RowDataWithType(RowType rowType, RowData rowData) {
+ this.rowType = rowType;
+ this.rowData = rowData;
+ }
+ }
+}
diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/AvroSchemaPatcherPropertyTest.java b/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/AvroSchemaPatcherPropertyTest.java
new file mode 100644
index 00000000..a0cd5cde
--- /dev/null
+++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/AvroSchemaPatcherPropertyTest.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.glue.schema.registry;
+
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import net.jqwik.api.Arbitraries;
+import net.jqwik.api.Arbitrary;
+import net.jqwik.api.ForAll;
+import net.jqwik.api.Property;
+import net.jqwik.api.Provide;
+import net.jqwik.api.Tag;
+import net.jqwik.api.Tuple;
+import org.apache.avro.Schema;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Property-based tests for {@link AvroSchemaPatcher}.
+ *
+ * Property 2: Avro schema patching preserves fields with overridden namespace and name
+ *
+ *
Validates: Requirements 2.3, 2.4, 2.9
+ */
+@Tag("Feature: gsr-flink-sql-formats, Property 2: Avro schema patching preserves fields with"
+ + " overridden namespace and name")
+class AvroSchemaPatcherPropertyTest {
+
+ /**
+ * For any valid Flink RowType and for any non-empty namespace and record name strings, patching
+ * the auto-generated Avro schema should produce a schema where the namespace and record name
+ * match the provided values, and all fields are preserved (with nested records also patched).
+ */
+ @Property(tries = 100)
+ void patchSchemaPreservesFieldsWithOverriddenNamespaceAndName(
+ @ForAll("rowTypes") RowType rowType,
+ @ForAll("namespaces") String namespace,
+ @ForAll("recordNames") String recordName) {
+
+ Schema original = AvroSchemaConverter.convertToSchema(rowType);
+ Schema patched = AvroSchemaPatcher.patchSchema(original, namespace, recordName);
+
+ assertThat(patched.getNamespace()).isEqualTo(namespace);
+ assertThat(patched.getName()).isEqualTo(recordName);
+ assertThat(patched.getFields()).hasSameSizeAs(original.getFields());
+
+ for (int i = 0; i < original.getFields().size(); i++) {
+ Schema.Field originalField = original.getFields().get(i);
+ Schema.Field patchedField = patched.getFields().get(i);
+ assertThat(patchedField.name()).isEqualTo(originalField.name());
+ // For nested records, the schema will be patched too, so we verify structure
+ assertSchemaStructureMatches(originalField.schema(), patchedField.schema(), namespace);
+ }
+ }
+
+ /**
+ * Verifies that the patched schema has the same structure as the original,
+ * with nested records having the new namespace.
+ */
+ private void assertSchemaStructureMatches(Schema original, Schema patched, String expectedNamespace) {
+ assertThat(patched.getType()).isEqualTo(original.getType());
+
+ switch (original.getType()) {
+ case RECORD:
+ assertThat(patched.getNamespace()).isEqualTo(expectedNamespace);
+ assertThat(patched.getFields()).hasSameSizeAs(original.getFields());
+ for (int i = 0; i < original.getFields().size(); i++) {
+ assertSchemaStructureMatches(
+ original.getFields().get(i).schema(),
+ patched.getFields().get(i).schema(),
+ expectedNamespace);
+ }
+ break;
+ case ARRAY:
+ assertSchemaStructureMatches(original.getElementType(), patched.getElementType(), expectedNamespace);
+ break;
+ case MAP:
+ assertSchemaStructureMatches(original.getValueType(), patched.getValueType(), expectedNamespace);
+ break;
+ case UNION:
+ assertThat(patched.getTypes()).hasSameSizeAs(original.getTypes());
+ for (int i = 0; i < original.getTypes().size(); i++) {
+ assertSchemaStructureMatches(
+ original.getTypes().get(i),
+ patched.getTypes().get(i),
+ expectedNamespace);
+ }
+ break;
+ default:
+ // Primitive types should be equal
+ assertThat(patched).isEqualTo(original);
+ }
+ }
+
+ /**
+ * When only namespace is provided (recordName is null), the record name should remain
+ * unchanged.
+ */
+ @Property(tries = 100)
+ void patchSchemaWithOnlyNamespacePreservesRecordName(
+ @ForAll("rowTypes") RowType rowType,
+ @ForAll("namespaces") String namespace) {
+
+ Schema original = AvroSchemaConverter.convertToSchema(rowType);
+ Schema patched = AvroSchemaPatcher.patchSchema(original, namespace, null);
+
+ assertThat(patched.getNamespace()).isEqualTo(namespace);
+ assertThat(patched.getName()).isEqualTo(original.getName());
+ assertThat(patched.getFields()).hasSameSizeAs(original.getFields());
+ }
+
+ /**
+ * When only recordName is provided (namespace is null), the namespace should remain unchanged.
+ */
+ @Property(tries = 100)
+ void patchSchemaWithOnlyRecordNamePreservesNamespace(
+ @ForAll("rowTypes") RowType rowType,
+ @ForAll("recordNames") String recordName) {
+
+ Schema original = AvroSchemaConverter.convertToSchema(rowType);
+ Schema patched = AvroSchemaPatcher.patchSchema(original, null, recordName);
+
+ assertThat(patched.getNamespace()).isEqualTo(original.getNamespace());
+ assertThat(patched.getName()).isEqualTo(recordName);
+ assertThat(patched.getFields()).hasSameSizeAs(original.getFields());
+ }
+
+ /**
+ * When both namespace and recordName are null, the original schema should be returned
+ * unchanged.
+ */
+ @Property(tries = 100)
+ void patchSchemaWithNullOverridesReturnsOriginal(@ForAll("rowTypes") RowType rowType) {
+
+ Schema original = AvroSchemaConverter.convertToSchema(rowType);
+ Schema patched = AvroSchemaPatcher.patchSchema(original, null, null);
+
+ assertThat(patched).isSameAs(original);
+ }
+
+ @Provide
+ Arbitrary rowTypes() {
+ return rowTypesWithDepth(0);
+ }
+
+ /**
+ * Generates RowTypes with nested complex types up to a maximum depth.
+ */
+ private Arbitrary rowTypesWithDepth(int depth) {
+ Arbitrary fieldCount = Arbitraries.integers().between(1, 5);
+ return fieldCount.flatMap(
+ count -> {
+ Arbitrary> types = logicalTypesWithDepth(depth).list().ofSize(count);
+ return types.map(
+ typeList -> {
+ List fields =
+ IntStream.range(0, typeList.size())
+ .mapToObj(
+ i ->
+ new RowType.RowField(
+ "f" + i, typeList.get(i)))
+ .collect(Collectors.toList());
+ return new RowType(false, fields);
+ });
+ });
+ }
+
+ /**
+ * Generates LogicalTypes including primitives and complex types (ARRAY, MAP, ROW) with depth control.
+ */
+ private Arbitrary logicalTypesWithDepth(int depth) {
+ // Primitive types - always available
+ Arbitrary primitives = Arbitraries.of(
+ (LogicalType) new VarCharType(VarCharType.MAX_LENGTH),
+ new IntType(),
+ new BigIntType(),
+ new BooleanType(),
+ new FloatType(),
+ new DoubleType());
+
+ // At max depth, only return primitives
+ if (depth >= 2) {
+ return primitives;
+ }
+
+ // Complex types with nested structures
+ Arbitrary arrayType = logicalTypesWithDepth(depth + 1)
+ .map(ArrayType::new);
+
+ Arbitrary mapType = logicalTypesWithDepth(depth + 1)
+ .map(valueType -> new MapType(new VarCharType(VarCharType.MAX_LENGTH), valueType));
+
+ Arbitrary nestedRowType = rowTypesWithDepth(depth + 1)
+ .map(rt -> (LogicalType) rt);
+
+ // Mix primitives and complex types with higher weight on primitives
+ return Arbitraries.frequencyOf(
+ Tuple.of(6, primitives),
+ Tuple.of(1, arrayType),
+ Tuple.of(1, mapType),
+ Tuple.of(2, nestedRowType));
+ }
+
+ @Provide
+ Arbitrary namespaces() {
+ return Arbitraries.strings()
+ .withCharRange('a', 'z')
+ .ofMinLength(1)
+ .ofMaxLength(20)
+ .flatMap(
+ first ->
+ Arbitraries.strings()
+ .withCharRange('a', 'z')
+ .ofMinLength(1)
+ .ofMaxLength(20)
+ .map(second -> first + "." + second));
+ }
+
+ @Provide
+ Arbitrary recordNames() {
+ // Avro record names must start with a letter and contain only alphanumeric + underscore
+ return Arbitraries.strings()
+ .withCharRange('A', 'Z')
+ .ofLength(1)
+ .flatMap(
+ first ->
+ Arbitraries.strings()
+ .withCharRange('a', 'z')
+ .ofMinLength(1)
+ .ofMaxLength(15)
+ .map(rest -> first + rest));
+ }
+
+ /**
+ * Test that nested record types are also patched with the same namespace.
+ */
+ @Test
+ void patchSchemaRecursivelyPatchesNestedRecords() {
+ // Create a RowType with nested ROW (record within record)
+ RowType nestedRowType = new RowType(
+ false,
+ Arrays.asList(
+ new RowType.RowField("name", new VarCharType(VarCharType.MAX_LENGTH)),
+ new RowType.RowField("age", new IntType())));
+
+ RowType outerRowType = new RowType(
+ false,
+ Arrays.asList(
+ new RowType.RowField("id", new VarCharType(VarCharType.MAX_LENGTH)),
+ new RowType.RowField("customer", nestedRowType)));
+
+ Schema original = AvroSchemaConverter.convertToSchema(outerRowType);
+ Schema patched = AvroSchemaPatcher.patchSchema(original, "com.example.orders", "OrderEvent");
+
+ // Verify root record is patched
+ assertThat(patched.getNamespace()).isEqualTo("com.example.orders");
+ assertThat(patched.getName()).isEqualTo("OrderEvent");
+
+ // Verify nested record is also patched with the same namespace
+ Schema customerField = patched.getField("customer").schema();
+ // Handle union type (nullable)
+ if (customerField.getType() == Schema.Type.UNION) {
+ customerField = customerField.getTypes().stream()
+ .filter(s -> s.getType() == Schema.Type.RECORD)
+ .findFirst()
+ .orElseThrow();
+ }
+ assertThat(customerField.getNamespace()).isEqualTo("com.example.orders");
+ }
+
+ /**
+ * Test that ARRAY of records has nested records patched.
+ */
+ @Test
+ void patchSchemaRecursivelyPatchesArrayOfRecords() {
+ // Create a RowType with ARRAY>
+ RowType itemRowType = new RowType(
+ false,
+ Arrays.asList(
+ new RowType.RowField("product_name", new VarCharType(VarCharType.MAX_LENGTH)),
+ new RowType.RowField("quantity", new IntType())));
+
+ RowType outerRowType = new RowType(
+ false,
+ Arrays.asList(
+ new RowType.RowField("order_id", new VarCharType(VarCharType.MAX_LENGTH)),
+ new RowType.RowField("items", new ArrayType(itemRowType))));
+
+ Schema original = AvroSchemaConverter.convertToSchema(outerRowType);
+ Schema patched = AvroSchemaPatcher.patchSchema(original, "com.example.orders", "OrderEvent");
+
+ // Verify root record is patched
+ assertThat(patched.getNamespace()).isEqualTo("com.example.orders");
+ assertThat(patched.getName()).isEqualTo("OrderEvent");
+
+ // Verify array element record is also patched
+ Schema itemsField = patched.getField("items").schema();
+ // Handle union type (nullable)
+ if (itemsField.getType() == Schema.Type.UNION) {
+ itemsField = itemsField.getTypes().stream()
+ .filter(s -> s.getType() == Schema.Type.ARRAY)
+ .findFirst()
+ .orElseThrow();
+ }
+ Schema elementSchema = itemsField.getElementType();
+ // Handle union type for element
+ if (elementSchema.getType() == Schema.Type.UNION) {
+ elementSchema = elementSchema.getTypes().stream()
+ .filter(s -> s.getType() == Schema.Type.RECORD)
+ .findFirst()
+ .orElseThrow();
+ }
+ assertThat(elementSchema.getNamespace()).isEqualTo("com.example.orders");
+ }
+
+ /**
+ * Test that MAP values with record types are patched.
+ */
+ @Test
+ void patchSchemaRecursivelyPatchesMapValues() {
+ // Create a RowType with MAP>
+ RowType valueRowType = new RowType(
+ false,
+ Arrays.asList(
+ new RowType.RowField("key", new VarCharType(VarCharType.MAX_LENGTH)),
+ new RowType.RowField("value", new IntType())));
+
+ RowType outerRowType = new RowType(
+ false,
+ Arrays.asList(
+ new RowType.RowField("id", new VarCharType(VarCharType.MAX_LENGTH)),
+ new RowType.RowField("metadata", new MapType(
+ new VarCharType(VarCharType.MAX_LENGTH), valueRowType))));
+
+ Schema original = AvroSchemaConverter.convertToSchema(outerRowType);
+ Schema patched = AvroSchemaPatcher.patchSchema(original, "com.example.data", "DataRecord");
+
+ // Verify root record is patched
+ assertThat(patched.getNamespace()).isEqualTo("com.example.data");
+ assertThat(patched.getName()).isEqualTo("DataRecord");
+
+ // Verify map value record is also patched
+ Schema metadataField = patched.getField("metadata").schema();
+ // Handle union type (nullable)
+ if (metadataField.getType() == Schema.Type.UNION) {
+ metadataField = metadataField.getTypes().stream()
+ .filter(s -> s.getType() == Schema.Type.MAP)
+ .findFirst()
+ .orElseThrow();
+ }
+ Schema valueSchema = metadataField.getValueType();
+ // Handle union type for value
+ if (valueSchema.getType() == Schema.Type.UNION) {
+ valueSchema = valueSchema.getTypes().stream()
+ .filter(s -> s.getType() == Schema.Type.RECORD)
+ .findFirst()
+ .orElseThrow();
+ }
+ assertThat(valueSchema.getNamespace()).isEqualTo("com.example.data");
+ }
+}
diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/AvroSchemaResolverTest.java b/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/AvroSchemaResolverTest.java
new file mode 100644
index 00000000..c53f110b
--- /dev/null
+++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/AvroSchemaResolverTest.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.glue.schema.registry;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Unit tests for {@link AvroSchemaResolver}.
+ *
+ * Tests the schema resolution flow:
+ *
+ * - If fetchFromRegistry is true and fetcher succeeds, use fetched schema
+ * - If fetchFromRegistry is true but fetcher fails, fall back to patched/auto-generated schema
+ * - If namespace/record-name overrides are provided, patch the schema
+ * - Otherwise, use the auto-generated schema unchanged
+ *
+ */
+class AvroSchemaResolverTest {
+
+ private static final RowType TEST_ROW_TYPE =
+ (RowType)
+ DataTypes.ROW(
+ DataTypes.FIELD("id", DataTypes.STRING()),
+ DataTypes.FIELD("value", DataTypes.INT()))
+ .getLogicalType();
+
+ // Note: AvroSchemaConverter.convertToSchema returns a UNION ["null", record] for nullable rows
+ private static final Schema AUTO_GENERATED_SCHEMA =
+ AvroSchemaConverter.convertToSchema(TEST_ROW_TYPE);
+
+ private static final String REGISTRY_NAME = "test-registry";
+ private static final String SCHEMA_NAME = "test-schema";
+
+ /**
+ * Extracts the RECORD schema from a potentially UNION schema.
+ * AvroSchemaConverter wraps records in ["null", record] unions.
+ */
+ private static Schema extractRecordSchema(Schema schema) {
+ if (schema.getType() == Schema.Type.UNION) {
+ return schema.getTypes().stream()
+ .filter(s -> s.getType() == Schema.Type.RECORD)
+ .findFirst()
+ .orElseThrow(() -> new IllegalArgumentException("No RECORD in UNION"));
+ }
+ return schema;
+ }
+
+ @Test
+ void testResolveSchemaWithFetchFromRegistrySuccess() {
+ // Create a mock fetched schema with custom namespace using SchemaBuilder
+ Schema fetchedSchema = SchemaBuilder.record("FetchedRecord")
+ .namespace("com.example.fetched")
+ .fields()
+ .optionalString("id")
+ .optionalInt("value")
+ .endRecord();
+
+ MockSchemaFetcher fetcher = new MockSchemaFetcher(fetchedSchema);
+
+ Configuration config = new Configuration();
+ config.set(GlueFormatOptions.REGISTRY_NAME, REGISTRY_NAME);
+ config.set(GlueFormatOptions.SCHEMA_NAME, SCHEMA_NAME);
+ config.set(AvroGlueFormatOptions.SCHEMA_FETCH_FROM_REGISTRY, true);
+
+ Schema resolved = AvroSchemaResolver.resolveSchema(AUTO_GENERATED_SCHEMA, config, fetcher);
+
+ assertThat(resolved.getNamespace()).isEqualTo("com.example.fetched");
+ assertThat(resolved.getName()).isEqualTo("FetchedRecord");
+ assertThat(fetcher.fetchCalled).isTrue();
+ assertThat(fetcher.lastRegistryName).isEqualTo(REGISTRY_NAME);
+ assertThat(fetcher.lastSchemaName).isEqualTo(SCHEMA_NAME);
+ }
+
+ @Test
+ void testResolveSchemaWithFetchFromRegistryFailure() {
+ // Fetcher that throws an exception
+ MockSchemaFetcher fetcher = new MockSchemaFetcher(new RuntimeException("GSR unavailable"));
+
+ Configuration config = new Configuration();
+ config.set(GlueFormatOptions.REGISTRY_NAME, REGISTRY_NAME);
+ config.set(GlueFormatOptions.SCHEMA_NAME, SCHEMA_NAME);
+ config.set(AvroGlueFormatOptions.SCHEMA_FETCH_FROM_REGISTRY, true);
+ config.set(AvroGlueFormatOptions.AVRO_NAMESPACE, "com.example.fallback");
+
+ Schema resolved = AvroSchemaResolver.resolveSchema(AUTO_GENERATED_SCHEMA, config, fetcher);
+
+ // Should fall back to patched schema (extract RECORD from UNION)
+ Schema resolvedRecord = extractRecordSchema(resolved);
+ assertThat(resolvedRecord.getNamespace()).isEqualTo("com.example.fallback");
+ assertThat(fetcher.fetchCalled).isTrue();
+ }
+
+ @Test
+ void testResolveSchemaWithFetchFromRegistryReturnsNull() {
+ // Fetcher that returns null (schema not found)
+ MockSchemaFetcher fetcher = new MockSchemaFetcher((Schema) null);
+
+ Configuration config = new Configuration();
+ config.set(GlueFormatOptions.REGISTRY_NAME, REGISTRY_NAME);
+ config.set(GlueFormatOptions.SCHEMA_NAME, SCHEMA_NAME);
+ config.set(AvroGlueFormatOptions.SCHEMA_FETCH_FROM_REGISTRY, true);
+ config.set(AvroGlueFormatOptions.AVRO_RECORD_NAME, "FallbackRecord");
+
+ Schema resolved = AvroSchemaResolver.resolveSchema(AUTO_GENERATED_SCHEMA, config, fetcher);
+
+ // Should fall back to patched schema (extract RECORD from UNION)
+ Schema resolvedRecord = extractRecordSchema(resolved);
+ assertThat(resolvedRecord.getName()).isEqualTo("FallbackRecord");
+ assertThat(fetcher.fetchCalled).isTrue();
+ }
+
+ @Test
+ void testResolveSchemaWithNamespaceOverrideOnly() {
+ Configuration config = new Configuration();
+ config.set(GlueFormatOptions.REGISTRY_NAME, REGISTRY_NAME);
+ config.set(GlueFormatOptions.SCHEMA_NAME, SCHEMA_NAME);
+ config.set(AvroGlueFormatOptions.SCHEMA_FETCH_FROM_REGISTRY, false);
+ config.set(AvroGlueFormatOptions.AVRO_NAMESPACE, "com.example.custom");
+
+ Schema resolved = AvroSchemaResolver.resolveSchema(AUTO_GENERATED_SCHEMA, config, null);
+
+ // Extract RECORD from UNION for assertions
+ Schema resolvedRecord = extractRecordSchema(resolved);
+ Schema originalRecord = extractRecordSchema(AUTO_GENERATED_SCHEMA);
+ assertThat(resolvedRecord.getNamespace()).isEqualTo("com.example.custom");
+ assertThat(resolvedRecord.getName()).isEqualTo(originalRecord.getName());
+ }
+
+ @Test
+ void testResolveSchemaWithRecordNameOverrideOnly() {
+ Configuration config = new Configuration();
+ config.set(GlueFormatOptions.REGISTRY_NAME, REGISTRY_NAME);
+ config.set(GlueFormatOptions.SCHEMA_NAME, SCHEMA_NAME);
+ config.set(AvroGlueFormatOptions.SCHEMA_FETCH_FROM_REGISTRY, false);
+ config.set(AvroGlueFormatOptions.AVRO_RECORD_NAME, "CustomRecord");
+
+ Schema resolved = AvroSchemaResolver.resolveSchema(AUTO_GENERATED_SCHEMA, config, null);
+
+ // Extract RECORD from UNION for assertions
+ Schema resolvedRecord = extractRecordSchema(resolved);
+ Schema originalRecord = extractRecordSchema(AUTO_GENERATED_SCHEMA);
+ assertThat(resolvedRecord.getNamespace()).isEqualTo(originalRecord.getNamespace());
+ assertThat(resolvedRecord.getName()).isEqualTo("CustomRecord");
+ }
+
+ @Test
+ void testResolveSchemaWithBothOverrides() {
+ Configuration config = new Configuration();
+ config.set(GlueFormatOptions.REGISTRY_NAME, REGISTRY_NAME);
+ config.set(GlueFormatOptions.SCHEMA_NAME, SCHEMA_NAME);
+ config.set(AvroGlueFormatOptions.SCHEMA_FETCH_FROM_REGISTRY, false);
+ config.set(AvroGlueFormatOptions.AVRO_NAMESPACE, "com.example.custom");
+ config.set(AvroGlueFormatOptions.AVRO_RECORD_NAME, "CustomRecord");
+
+ Schema resolved = AvroSchemaResolver.resolveSchema(AUTO_GENERATED_SCHEMA, config, null);
+
+ // Extract RECORD from UNION for assertions
+ Schema resolvedRecord = extractRecordSchema(resolved);
+ assertThat(resolvedRecord.getNamespace()).isEqualTo("com.example.custom");
+ assertThat(resolvedRecord.getName()).isEqualTo("CustomRecord");
+ }
+
+ @Test
+ void testResolveSchemaWithNoOverrides() {
+ Configuration config = new Configuration();
+ config.set(GlueFormatOptions.REGISTRY_NAME, REGISTRY_NAME);
+ config.set(GlueFormatOptions.SCHEMA_NAME, SCHEMA_NAME);
+ config.set(AvroGlueFormatOptions.SCHEMA_FETCH_FROM_REGISTRY, false);
+
+ Schema resolved = AvroSchemaResolver.resolveSchema(AUTO_GENERATED_SCHEMA, config, null);
+
+ // Should return the original schema unchanged
+ assertThat(resolved).isSameAs(AUTO_GENERATED_SCHEMA);
+ }
+
+ @Test
+ void testResolveSchemaWithFetchDisabledIgnoresFetcher() {
+ Schema fetchedSchema = SchemaBuilder.record("FetchedRecord")
+ .namespace("com.example.fetched")
+ .fields()
+ .optionalString("id")
+ .optionalInt("value")
+ .endRecord();
+
+ MockSchemaFetcher fetcher = new MockSchemaFetcher(fetchedSchema);
+
+ Configuration config = new Configuration();
+ config.set(GlueFormatOptions.REGISTRY_NAME, REGISTRY_NAME);
+ config.set(GlueFormatOptions.SCHEMA_NAME, SCHEMA_NAME);
+ config.set(AvroGlueFormatOptions.SCHEMA_FETCH_FROM_REGISTRY, false);
+
+ Schema resolved = AvroSchemaResolver.resolveSchema(AUTO_GENERATED_SCHEMA, config, fetcher);
+
+ // Should NOT call fetcher when fetchFromRegistry is false
+ assertThat(fetcher.fetchCalled).isFalse();
+ assertThat(resolved).isSameAs(AUTO_GENERATED_SCHEMA);
+ }
+
+ /**
+ * Mock implementation of SchemaFetcher for testing.
+ */
+ private static class MockSchemaFetcher implements AvroSchemaResolver.SchemaFetcher {
+ private final Schema schemaToReturn;
+ private final Exception exceptionToThrow;
+ boolean fetchCalled = false;
+ String lastRegistryName;
+ String lastSchemaName;
+
+ MockSchemaFetcher(@Nullable Schema schemaToReturn) {
+ this.schemaToReturn = schemaToReturn;
+ this.exceptionToThrow = null;
+ }
+
+ MockSchemaFetcher(Exception exceptionToThrow) {
+ this.schemaToReturn = null;
+ this.exceptionToThrow = exceptionToThrow;
+ }
+
+ @Override
+ public Schema fetchSchema(String registryName, String schemaName) throws Exception {
+ fetchCalled = true;
+ lastRegistryName = registryName;
+ lastSchemaName = schemaName;
+ if (exceptionToThrow != null) {
+ throw exceptionToThrow;
+ }
+ return schemaToReturn;
+ }
+ }
+}
diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueFormatConfigBuilderPropertyTest.java b/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueFormatConfigBuilderPropertyTest.java
new file mode 100644
index 00000000..3a5cf58b
--- /dev/null
+++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueFormatConfigBuilderPropertyTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.glue.schema.registry;
+
+import org.apache.flink.configuration.Configuration;
+
+import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
+import net.jqwik.api.Arbitraries;
+import net.jqwik.api.Arbitrary;
+import net.jqwik.api.Combinators;
+import net.jqwik.api.ForAll;
+import net.jqwik.api.Property;
+import net.jqwik.api.Provide;
+import net.jqwik.api.Tag;
+import software.amazon.awssdk.services.glue.model.Compatibility;
+
+import java.util.Map;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Property-based tests for {@link GlueFormatConfigBuilder}.
+ *
+ * Property 6: Config map builder correctly maps all provided options
+ *
+ *
Validates: Requirements 5.1, 5.2
+ */
+@Tag("Feature: gsr-flink-sql-formats, Property 6: Config map builder correctly maps all provided"
+ + " options")
+class GlueFormatConfigBuilderPropertyTest {
+
+ /**
+ * For any valid combination of GSR config option values, building the config map via
+ * GlueFormatConfigBuilder.buildConfigMap() should produce a map where each provided option maps
+ * to the correct AWSSchemaRegistryConstants key with the same value, and absent options are not
+ * present in the map.
+ */
+ @Property(tries = 100)
+ void configMapBuilderCorrectlyMapsAllProvidedOptions(
+ @ForAll("gsrConfigCombinations") GsrConfigInput input) {
+
+ Configuration config = new Configuration();
+
+ input.region.ifPresent(v -> config.set(GlueFormatOptions.AWS_REGION, v));
+ input.endpoint.ifPresent(v -> config.set(GlueFormatOptions.AWS_ENDPOINT, v));
+ input.registryName.ifPresent(v -> config.set(GlueFormatOptions.REGISTRY_NAME, v));
+ input.schemaName.ifPresent(v -> config.set(GlueFormatOptions.SCHEMA_NAME, v));
+ input.cacheSize.ifPresent(v -> config.set(GlueFormatOptions.CACHE_SIZE, v));
+ input.cacheTtlMs.ifPresent(v -> config.set(GlueFormatOptions.CACHE_TTL_MS, v));
+ input.autoRegistration.ifPresent(
+ v -> config.set(GlueFormatOptions.SCHEMA_AUTO_REGISTRATION, v));
+ input.compatibility.ifPresent(
+ v -> config.set(GlueFormatOptions.SCHEMA_COMPATIBILITY, v));
+ input.compression.ifPresent(
+ v -> config.set(GlueFormatOptions.SCHEMA_COMPRESSION, v));
+
+ Map result = GlueFormatConfigBuilder.buildConfigMap(config);
+
+ // Verify present options map to correct keys with correct values
+ assertOptionalMapping(
+ result, input.region, AWSSchemaRegistryConstants.AWS_REGION);
+ assertOptionalMapping(
+ result, input.endpoint, AWSSchemaRegistryConstants.AWS_ENDPOINT);
+ assertOptionalMapping(
+ result, input.registryName, AWSSchemaRegistryConstants.REGISTRY_NAME);
+ assertOptionalMapping(
+ result, input.schemaName, AWSSchemaRegistryConstants.SCHEMA_NAME);
+ assertOptionalMapping(
+ result, input.cacheSize, AWSSchemaRegistryConstants.CACHE_SIZE);
+ assertOptionalMapping(
+ result, input.cacheTtlMs, AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS);
+ assertOptionalMapping(
+ result,
+ input.autoRegistration,
+ AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING);
+ assertOptionalMapping(
+ result, input.compatibility, AWSSchemaRegistryConstants.COMPATIBILITY_SETTING);
+ assertOptionalMapping(
+ result, input.compression, AWSSchemaRegistryConstants.COMPRESSION_TYPE);
+
+ // Verify map size equals number of provided options
+ long providedCount =
+ countPresent(
+ input.region,
+ input.endpoint,
+ input.registryName,
+ input.schemaName,
+ input.cacheSize,
+ input.cacheTtlMs,
+ input.autoRegistration,
+ input.compatibility,
+ input.compression);
+ assertThat(result).hasSize((int) providedCount);
+ }
+
+ @Provide
+ Arbitrary gsrConfigCombinations() {
+ Arbitrary> optRegion =
+ Arbitraries.strings().alpha().ofMinLength(1).ofMaxLength(20).optional();
+ Arbitrary> optEndpoint =
+ Arbitraries.strings()
+ .alpha()
+ .ofMinLength(1)
+ .ofMaxLength(30)
+ .map(s -> "https://" + s)
+ .optional();
+ Arbitrary> optRegistryName =
+ Arbitraries.strings().alpha().ofMinLength(1).ofMaxLength(30).optional();
+ Arbitrary> optSchemaName =
+ Arbitraries.strings().alpha().ofMinLength(1).ofMaxLength(30).optional();
+ Arbitrary> optCacheSize =
+ Arbitraries.integers().between(1, 10000).optional();
+ Arbitrary> optCacheTtlMs =
+ Arbitraries.longs().between(1000L, 172800000L).optional();
+ Arbitrary> optAutoReg = Arbitraries.of(true, false).optional();
+ Arbitrary> optCompat =
+ Arbitraries.of(Compatibility.knownValues().toArray(new Compatibility[0]))
+ .optional();
+ Arbitrary> optCompress =
+ Arbitraries.of(AWSSchemaRegistryConstants.COMPRESSION.values()).optional();
+
+ // jqwik Combinators.combine supports up to 8 params, so we nest via flatMap
+ return Combinators.combine(
+ optRegion,
+ optEndpoint,
+ optRegistryName,
+ optSchemaName,
+ optCacheSize,
+ optCacheTtlMs,
+ optAutoReg,
+ optCompat)
+ .flatAs(
+ (region, endpoint, registry, schema, cache, ttl, autoReg, compat) ->
+ optCompress.map(
+ compress ->
+ new GsrConfigInput(
+ region,
+ endpoint,
+ registry,
+ schema,
+ cache,
+ ttl,
+ autoReg,
+ compat,
+ compress)));
+ }
+
+ private static void assertOptionalMapping(
+ Map result, Optional optionalValue, String expectedKey) {
+ if (optionalValue.isPresent()) {
+ assertThat(result).containsEntry(expectedKey, optionalValue.get());
+ } else {
+ assertThat(result).doesNotContainKey(expectedKey);
+ }
+ }
+
+ private static long countPresent(Optional>... optionals) {
+ long count = 0;
+ for (Optional> opt : optionals) {
+ if (opt.isPresent()) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ /** Value object holding an arbitrary combination of GSR config options. */
+ static class GsrConfigInput {
+ final Optional region;
+ final Optional endpoint;
+ final Optional registryName;
+ final Optional schemaName;
+ final Optional cacheSize;
+ final Optional cacheTtlMs;
+ final Optional autoRegistration;
+ final Optional compatibility;
+ final Optional compression;
+
+ GsrConfigInput(
+ Optional region,
+ Optional endpoint,
+ Optional registryName,
+ Optional schemaName,
+ Optional cacheSize,
+ Optional cacheTtlMs,
+ Optional autoRegistration,
+ Optional compatibility,
+ Optional compression) {
+ this.region = region;
+ this.endpoint = endpoint;
+ this.registryName = registryName;
+ this.schemaName = schemaName;
+ this.cacheSize = cacheSize;
+ this.cacheTtlMs = cacheTtlMs;
+ this.autoRegistration = autoRegistration;
+ this.compatibility = compatibility;
+ this.compression = compression;
+ }
+
+ @Override
+ public String toString() {
+ return "GsrConfigInput{"
+ + "region="
+ + region
+ + ", endpoint="
+ + endpoint
+ + ", registryName="
+ + registryName
+ + ", schemaName="
+ + schemaName
+ + ", cacheSize="
+ + cacheSize
+ + ", cacheTtlMs="
+ + cacheTtlMs
+ + ", autoRegistration="
+ + autoRegistration
+ + ", compatibility="
+ + compatibility
+ + ", compression="
+ + compression
+ + '}';
+ }
+ }
+}
diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactoryPropertyTest.java b/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactoryPropertyTest.java
new file mode 100644
index 00000000..d7c95b80
--- /dev/null
+++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactoryPropertyTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.glue.schema.registry;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.factories.TestDynamicTableFactory;
+
+import net.jqwik.api.Arbitraries;
+import net.jqwik.api.Arbitrary;
+import net.jqwik.api.Combinators;
+import net.jqwik.api.ForAll;
+import net.jqwik.api.Property;
+import net.jqwik.api.Provide;
+import net.jqwik.api.Tag;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Property-based tests for required option validation in {@link
+ * GlueSchemaRegistryAvroFormatFactory}.
+ *
+ * Validates: Requirements 1.2, 3.2, 4.2, 5.3
+ */
+@Tag("Feature: gsr-flink-sql-formats, Property 1: Required option validation across all format"
+ + " factories")
+class GlueSchemaRegistryAvroFormatFactoryPropertyTest {
+
+ private static final ResolvedSchema SCHEMA =
+ ResolvedSchema.of(
+ Column.physical("a", DataTypes.STRING()),
+ Column.physical("b", DataTypes.INT()),
+ Column.physical("c", DataTypes.BOOLEAN()));
+
+ /**
+ * For any subset of required options that is missing at least one required option, creating a
+ * table source should throw a ValidationException.
+ *
+ *
Required options: aws.region, registry.name, schema.name
+ */
+ @Property(tries = 100)
+ void missingAnyRequiredOptionCausesValidationException(
+ @ForAll("incompleteRequiredOptionSets") RequiredOptionSubset subset) {
+
+ Map options = new HashMap<>();
+ options.put("connector", TestDynamicTableFactory.IDENTIFIER);
+ options.put("target", "MyTarget");
+ options.put("buffer-size", "1000");
+ options.put("format", GlueSchemaRegistryAvroFormatFactory.IDENTIFIER);
+
+ if (subset.includeRegion) {
+ options.put("avro-glue.aws.region", subset.regionValue);
+ }
+ if (subset.includeRegistryName) {
+ options.put("avro-glue.registry.name", subset.registryNameValue);
+ }
+ if (subset.includeSchemaName) {
+ options.put("avro-glue.schema.name", subset.schemaNameValue);
+ }
+
+ assertThatThrownBy(() -> createTableSource(SCHEMA, options))
+ .isInstanceOf(ValidationException.class);
+ }
+
+ @Provide
+ Arbitrary incompleteRequiredOptionSets() {
+ Arbitrary bools = Arbitraries.of(true, false);
+ Arbitrary regionValues =
+ Arbitraries.of("us-east-1", "us-west-2", "eu-west-1", "ap-southeast-1");
+ Arbitrary registryValues =
+ Arbitraries.strings().alpha().ofMinLength(1).ofMaxLength(20);
+ Arbitrary schemaValues =
+ Arbitraries.strings().alpha().ofMinLength(1).ofMaxLength(20);
+
+ return Combinators.combine(bools, bools, bools, regionValues, registryValues, schemaValues)
+ .as(RequiredOptionSubset::new)
+ // Filter to only keep subsets where at least one required option is missing
+ .filter(s -> !(s.includeRegion && s.includeRegistryName && s.includeSchemaName));
+ }
+
+ /** Value object representing a subset of required options. */
+ static class RequiredOptionSubset {
+ final boolean includeRegion;
+ final boolean includeRegistryName;
+ final boolean includeSchemaName;
+ final String regionValue;
+ final String registryNameValue;
+ final String schemaNameValue;
+
+ RequiredOptionSubset(
+ boolean includeRegion,
+ boolean includeRegistryName,
+ boolean includeSchemaName,
+ String regionValue,
+ String registryNameValue,
+ String schemaNameValue) {
+ this.includeRegion = includeRegion;
+ this.includeRegistryName = includeRegistryName;
+ this.includeSchemaName = includeSchemaName;
+ this.regionValue = regionValue;
+ this.registryNameValue = registryNameValue;
+ this.schemaNameValue = schemaNameValue;
+ }
+
+ @Override
+ public String toString() {
+ return "RequiredOptionSubset{"
+ + "region="
+ + (includeRegion ? regionValue : "")
+ + ", registryName="
+ + (includeRegistryName ? registryNameValue : "")
+ + ", schemaName="
+ + (includeSchemaName ? schemaNameValue : "")
+ + '}';
+ }
+ }
+}
diff --git a/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactoryTest.java b/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactoryTest.java
new file mode 100644
index 00000000..250d30f9
--- /dev/null
+++ b/flink-formats-aws/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactoryTest.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.glue.schema.registry;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema;
+import org.apache.flink.formats.avro.AvroRowDataSerializationSchema;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.TestDynamicTableFactory;
+import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
+import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for the {@link GlueSchemaRegistryAvroFormatFactory}. */
+class GlueSchemaRegistryAvroFormatFactoryTest {
+
+ private static final ResolvedSchema SCHEMA =
+ ResolvedSchema.of(
+ Column.physical("a", DataTypes.STRING()),
+ Column.physical("b", DataTypes.INT()),
+ Column.physical("c", DataTypes.BOOLEAN()));
+
+ private static final RowType ROW_TYPE =
+ (RowType) SCHEMA.toPhysicalRowDataType().getLogicalType();
+
+ private static final String SCHEMA_NAME = "test-subject";
+ private static final String REGISTRY_NAME = "test-registry-name";
+ private static final String REGION = "us-west-2";
+
+ @Test
+ void testDeserializationSchema() {
+ final DynamicTableSource actualSource = createTableSource(SCHEMA, getDefaultOptions());
+ assertThat(actualSource).isInstanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class);
+
+ TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
+ (TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
+
+ DeserializationSchema actualDeser =
+ scanSourceMock.valueFormat.createRuntimeDecoder(
+ ScanRuntimeProviderContext.INSTANCE, SCHEMA.toPhysicalRowDataType());
+
+ assertThat(actualDeser).isInstanceOf(AvroRowDataDeserializationSchema.class);
+ }
+
+ @Test
+ void testSerializationSchema() {
+ final DynamicTableSink actualSink = createTableSink(SCHEMA, getDefaultOptions());
+ assertThat(actualSink).isInstanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class);
+
+ TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
+ (TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
+
+ SerializationSchema actualSer =
+ sinkMock.valueFormat.createRuntimeEncoder(null, SCHEMA.toPhysicalRowDataType());
+
+ assertThat(actualSer).isInstanceOf(AvroRowDataSerializationSchema.class);
+ }
+
+ @Test
+ void testMissingSchemaNameForSink() {
+ final Map options =
+ getModifiedOptions(opts -> opts.remove("avro-glue.schema.name"));
+
+ assertThatThrownBy(() -> createTableSink(SCHEMA, options))
+ .isInstanceOf(ValidationException.class);
+ }
+
+ @Test
+ void testMissingRegionForSource() {
+ final Map options =
+ getModifiedOptions(opts -> opts.remove("avro-glue.aws.region"));
+
+ assertThatThrownBy(() -> createTableSource(SCHEMA, options))
+ .isInstanceOf(ValidationException.class);
+ }
+
+ @Test
+ void testMissingRegistryNameForSource() {
+ final Map options =
+ getModifiedOptions(opts -> opts.remove("avro-glue.registry.name"));
+
+ assertThatThrownBy(() -> createTableSource(SCHEMA, options))
+ .isInstanceOf(ValidationException.class);
+ }
+
+ @Test
+ void testDeserializationSchemaWithNamespaceOverride() {
+ final String customNamespace = "com.example.custom";
+ final Map options =
+ getModifiedOptions(
+ opts -> opts.put("avro-glue.avro.namespace", customNamespace));
+
+ final DynamicTableSource actualSource = createTableSource(SCHEMA, options);
+ assertThat(actualSource).isInstanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class);
+
+ TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
+ (TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
+
+ DeserializationSchema actualDeser =
+ scanSourceMock.valueFormat.createRuntimeDecoder(
+ ScanRuntimeProviderContext.INSTANCE, SCHEMA.toPhysicalRowDataType());
+
+ // Verify the schema is created successfully with namespace override
+ assertThat(actualDeser).isInstanceOf(AvroRowDataDeserializationSchema.class);
+ }
+
+ @Test
+ void testSerializationSchemaWithRecordNameOverride() {
+ final String customRecordName = "MyCustomRecord";
+ final Map options =
+ getModifiedOptions(
+ opts -> opts.put("avro-glue.avro.record-name", customRecordName));
+
+ final DynamicTableSink actualSink = createTableSink(SCHEMA, options);
+ assertThat(actualSink).isInstanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class);
+
+ TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
+ (TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
+
+ SerializationSchema actualSer =
+ sinkMock.valueFormat.createRuntimeEncoder(null, SCHEMA.toPhysicalRowDataType());
+
+ // Verify the schema is created successfully with record name override
+ assertThat(actualSer).isInstanceOf(AvroRowDataSerializationSchema.class);
+ }
+
+ @Test
+ void testSpiDiscovery() {
+ final DynamicTableSource source = createTableSource(SCHEMA, getDefaultOptions());
+ assertThat(source).isNotNull();
+
+ final DynamicTableSink sink = createTableSink(SCHEMA, getDefaultOptions());
+ assertThat(sink).isNotNull();
+ }
+
+ @Test
+ void testDeserializationSchemaWithFetchFromRegistry() {
+ final Map options =
+ getModifiedOptions(
+ opts -> opts.put("avro-glue.schema.fetchFromRegistry", "true"));
+
+ // Should not throw - the option should be accepted
+ final DynamicTableSource actualSource = createTableSource(SCHEMA, options);
+ assertThat(actualSource).isInstanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class);
+
+ TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
+ (TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
+
+ DeserializationSchema actualDeser =
+ scanSourceMock.valueFormat.createRuntimeDecoder(
+ ScanRuntimeProviderContext.INSTANCE, SCHEMA.toPhysicalRowDataType());
+
+ assertThat(actualDeser).isInstanceOf(AvroRowDataDeserializationSchema.class);
+ }
+
+ @Test
+ void testSerializationSchemaWithFetchFromRegistry() {
+ final Map options =
+ getModifiedOptions(
+ opts -> opts.put("avro-glue.schema.fetchFromRegistry", "true"));
+
+ // Should not throw - the option should be accepted
+ final DynamicTableSink actualSink = createTableSink(SCHEMA, options);
+ assertThat(actualSink).isInstanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class);
+
+ TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
+ (TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
+
+ SerializationSchema actualSer =
+ sinkMock.valueFormat.createRuntimeEncoder(null, SCHEMA.toPhysicalRowDataType());
+
+ assertThat(actualSer).isInstanceOf(AvroRowDataSerializationSchema.class);
+ }
+
+ @Test
+ void testDeserializationSchemaWithAllOverrideOptions() {
+ final Map options =
+ getModifiedOptions(
+ opts -> {
+ opts.put("avro-glue.avro.namespace", "com.example.custom");
+ opts.put("avro-glue.avro.record-name", "CustomRecord");
+ opts.put("avro-glue.schema.fetchFromRegistry", "false");
+ });
+
+ final DynamicTableSource actualSource = createTableSource(SCHEMA, options);
+ assertThat(actualSource).isInstanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class);
+
+ TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
+ (TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
+
+ DeserializationSchema actualDeser =
+ scanSourceMock.valueFormat.createRuntimeDecoder(
+ ScanRuntimeProviderContext.INSTANCE, SCHEMA.toPhysicalRowDataType());
+
+ assertThat(actualDeser).isInstanceOf(AvroRowDataDeserializationSchema.class);
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private Map getModifiedOptions(Consumer