From a4ba7a52c2d36a8a07e021080237e810da0fa2c8 Mon Sep 17 00:00:00 2001 From: Myracle Date: Thu, 12 Mar 2026 15:46:44 +0800 Subject: [PATCH] [FLINK-39291][API / Type Serialization System] FlinkScalaKryoInstantiator InstantiatorStrategy fix --- .../types/FlinkScalaKryoInstantiator.scala | 10 +- .../types/FlinkScalaKryoInstantiatorTest.java | 214 ++++++++++++++++++ 2 files changed, 223 insertions(+), 1 deletion(-) create mode 100644 flink-table/flink-table-api-scala/src/test/java/org/apache/flink/table/api/runtime/types/FlinkScalaKryoInstantiatorTest.java diff --git a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/runtime/types/FlinkScalaKryoInstantiator.scala b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/runtime/types/FlinkScalaKryoInstantiator.scala index 4a844db6cf17a..8210cd48e291d 100644 --- a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/runtime/types/FlinkScalaKryoInstantiator.scala +++ b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/runtime/types/FlinkScalaKryoInstantiator.scala @@ -59,7 +59,15 @@ class FlinkScalaKryoInstantiator { def newKryo: Kryo = { val k = new Kryo k.setRegistrationRequired(false) - k.setInstantiatorStrategy(new org.objenesis.strategy.StdInstantiatorStrategy) + // Use DefaultInstantiatorStrategy as the primary strategy (which tries no-arg constructors + // first via reflection), and only falls back to StdInstantiatorStrategy (Objenesis, which + // bypasses constructors entirely) when no suitable constructor is found. + // Previously this was set to pure StdInstantiatorStrategy, which bypasses all constructors + // and can leave fields uninitialized (e.g., causing NPE in classes like Iceberg's + // SerializableByteBufferMap that rely on constructor initialization). + val initStrategy = new com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy() + initStrategy.setFallbackInstantiatorStrategy(new org.objenesis.strategy.StdInstantiatorStrategy) + k.setInstantiatorStrategy(initStrategy) k.register(classOf[Unit], new VoidSerializer) // The wrappers are private classes: useFieldSerializer(k, List(1, 2, 3).asJava.getClass) diff --git a/flink-table/flink-table-api-scala/src/test/java/org/apache/flink/table/api/runtime/types/FlinkScalaKryoInstantiatorTest.java b/flink-table/flink-table-api-scala/src/test/java/org/apache/flink/table/api/runtime/types/FlinkScalaKryoInstantiatorTest.java new file mode 100644 index 0000000000000..1cc0329665412 --- /dev/null +++ b/flink-table/flink-table-api-scala/src/test/java/org/apache/flink/table/api/runtime/types/FlinkScalaKryoInstantiatorTest.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.runtime.types; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.objenesis.strategy.StdInstantiatorStrategy; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link FlinkScalaKryoInstantiator}. + * + *

Verifies that the Kryo instance created by {@link FlinkScalaKryoInstantiator} uses {@link + * DefaultInstantiatorStrategy} as the primary strategy (which invokes no-arg constructors via + * reflection), with {@link StdInstantiatorStrategy} as a fallback (which uses Objenesis to bypass + * constructors). + * + *

This is critical because using a pure {@link StdInstantiatorStrategy} bypasses all + * constructors, leaving fields uninitialized (null) for classes that rely on constructor + * initialization (e.g., Iceberg's SerializableByteBufferMap). + */ +class FlinkScalaKryoInstantiatorTest { + + private Kryo kryo; + + @BeforeEach + void setUp() { + FlinkScalaKryoInstantiator instantiator = new FlinkScalaKryoInstantiator(); + kryo = instantiator.newKryo(); + } + + /** Verifies that the primary InstantiatorStrategy is DefaultInstantiatorStrategy. */ + @Test + void testInstantiatorStrategyIsDefaultWithFallback() { + assertThat(kryo.getInstantiatorStrategy()).isInstanceOf(DefaultInstantiatorStrategy.class); + + DefaultInstantiatorStrategy strategy = + (DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy(); + + assertThat(strategy.getFallbackInstantiatorStrategy()) + .isInstanceOf(StdInstantiatorStrategy.class); + } + + /** + * Verifies that a class with a no-arg constructor that initializes fields is properly + * instantiated by Kryo (constructor is called, fields are initialized). + * + *

This test simulates the real-world scenario where Iceberg's SerializableByteBufferMap + * initializes its internal 'wrapped' map in the constructor. Without the fix, the pure + * StdInstantiatorStrategy would bypass the constructor, leaving 'wrapped' as null, causing NPE + * during deserialization when MapSerializer tries to call map.put(). + */ + @Test + void testClassWithNoArgConstructorIsProperlyInitialized() { + // Verify that newInstance invokes the constructor + PojoWithConstructorInit instance = kryo.newInstance(PojoWithConstructorInit.class); + assertThat(instance).isNotNull(); + assertThat(instance.getInternalMap()) + .as( + "The internal map should be initialized by the constructor, " + + "not null (which would happen if the constructor was bypassed)") + .isNotNull(); + assertThat(instance.getInitFlag()) + .as("The init flag should be set to true by the constructor") + .isTrue(); + } + + /** + * Verifies that Kryo serialization round-trip works correctly for a class with a Map field + * initialized in the constructor. + * + *

This is the end-to-end test that reproduces the exact failure scenario: serialize an + * object with map entries, then deserialize it. Without the fix, MapSerializer.read() would + * call map.put() on a null map, causing NPE. + */ + @Test + void testSerializationRoundTripWithMapField() { + PojoWithConstructorInit original = new PojoWithConstructorInit(); + original.getInternalMap().put("key1", "value1"); + original.getInternalMap().put("key2", "value2"); + + // Serialize + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + Output output = new Output(baos); + kryo.writeClassAndObject(output, original); + output.close(); + + // Deserialize + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + Input input = new Input(bais); + Object deserialized = kryo.readClassAndObject(input); + input.close(); + + assertThat(deserialized).isInstanceOf(PojoWithConstructorInit.class); + PojoWithConstructorInit result = (PojoWithConstructorInit) deserialized; + assertThat(result.getInternalMap()).as("Deserialized map should not be null").isNotNull(); + assertThat(result.getInternalMap()) + .as("Deserialized map should contain the same entries") + .containsEntry("key1", "value1") + .containsEntry("key2", "value2") + .hasSize(2); + } + + /** + * Verifies that classes without a no-arg constructor can still be instantiated via the + * Objenesis fallback strategy. + */ + @Test + void testClassWithoutNoArgConstructorUsesObjenesisFallback() { + PojoWithoutNoArgConstructor instance = kryo.newInstance(PojoWithoutNoArgConstructor.class); + // Objenesis bypasses the constructor, so the object is created but fields are + // uninitialized. + // This is expected and correct behavior for classes without no-arg constructors. + assertThat(instance).isNotNull(); + } + + /** + * Verifies that the KryoSerializer (which loads FlinkScalaKryoInstantiator via reflection when + * it's on the classpath) produces a Kryo instance with the correct InstantiatorStrategy. + */ + @Test + void testKryoSerializerUsesCorrectStrategy() { + // When FlinkScalaKryoInstantiator is on the classpath (which it is in this test module), + // KryoSerializer.getKryoInstance() will use it to create the Kryo instance. + org.apache.flink.api.common.serialization.SerializerConfigImpl config = + new org.apache.flink.api.common.serialization.SerializerConfigImpl(); + org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer + kryoSerializer = + new org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer<>( + PojoWithConstructorInit.class, config); + Kryo kryoFromSerializer = kryoSerializer.getKryo(); + + assertThat(kryoFromSerializer.getInstantiatorStrategy()) + .isInstanceOf(DefaultInstantiatorStrategy.class); + + DefaultInstantiatorStrategy strategy = + (DefaultInstantiatorStrategy) kryoFromSerializer.getInstantiatorStrategy(); + assertThat(strategy.getFallbackInstantiatorStrategy()) + .isInstanceOf(StdInstantiatorStrategy.class); + } + + // ------------------------------------------------------------------------- + // Test helper classes + // ------------------------------------------------------------------------- + + /** + * A simple POJO that initializes an internal Map in its no-arg constructor. This simulates + * classes like Iceberg's SerializableByteBufferMap. + */ + public static class PojoWithConstructorInit implements Serializable { + private static final long serialVersionUID = 1L; + + private final Map internalMap; + private final boolean initFlag; + + public PojoWithConstructorInit() { + this.internalMap = new HashMap<>(); + this.initFlag = true; + } + + public Map getInternalMap() { + return internalMap; + } + + public boolean getInitFlag() { + return initFlag; + } + } + + /** + * A class without a no-arg constructor. Kryo should fall back to Objenesis + * (StdInstantiatorStrategy) for this class. + */ + public static class PojoWithoutNoArgConstructor implements Serializable { + private static final long serialVersionUID = 1L; + + private final String value; + + public PojoWithoutNoArgConstructor(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + } +}