diff --git a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/runtime/types/FlinkScalaKryoInstantiator.scala b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/runtime/types/FlinkScalaKryoInstantiator.scala index 4a844db6cf17a..8210cd48e291d 100644 --- a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/runtime/types/FlinkScalaKryoInstantiator.scala +++ b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/runtime/types/FlinkScalaKryoInstantiator.scala @@ -59,7 +59,15 @@ class FlinkScalaKryoInstantiator { def newKryo: Kryo = { val k = new Kryo k.setRegistrationRequired(false) - k.setInstantiatorStrategy(new org.objenesis.strategy.StdInstantiatorStrategy) + // Use DefaultInstantiatorStrategy as the primary strategy (which tries no-arg constructors + // first via reflection), and only falls back to StdInstantiatorStrategy (Objenesis, which + // bypasses constructors entirely) when no suitable constructor is found. + // Previously this was set to pure StdInstantiatorStrategy, which bypasses all constructors + // and can leave fields uninitialized (e.g., causing NPE in classes like Iceberg's + // SerializableByteBufferMap that rely on constructor initialization). + val initStrategy = new com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy() + initStrategy.setFallbackInstantiatorStrategy(new org.objenesis.strategy.StdInstantiatorStrategy) + k.setInstantiatorStrategy(initStrategy) k.register(classOf[Unit], new VoidSerializer) // The wrappers are private classes: useFieldSerializer(k, List(1, 2, 3).asJava.getClass) diff --git a/flink-table/flink-table-api-scala/src/test/java/org/apache/flink/table/api/runtime/types/FlinkScalaKryoInstantiatorTest.java b/flink-table/flink-table-api-scala/src/test/java/org/apache/flink/table/api/runtime/types/FlinkScalaKryoInstantiatorTest.java new file mode 100644 index 0000000000000..1cc0329665412 --- /dev/null +++ b/flink-table/flink-table-api-scala/src/test/java/org/apache/flink/table/api/runtime/types/FlinkScalaKryoInstantiatorTest.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.runtime.types; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.objenesis.strategy.StdInstantiatorStrategy; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link FlinkScalaKryoInstantiator}. + * + *
Verifies that the Kryo instance created by {@link FlinkScalaKryoInstantiator} uses {@link + * DefaultInstantiatorStrategy} as the primary strategy (which invokes no-arg constructors via + * reflection), with {@link StdInstantiatorStrategy} as a fallback (which uses Objenesis to bypass + * constructors). + * + *
This is critical because using a pure {@link StdInstantiatorStrategy} bypasses all + * constructors, leaving fields uninitialized (null) for classes that rely on constructor + * initialization (e.g., Iceberg's SerializableByteBufferMap). + */ +class FlinkScalaKryoInstantiatorTest { + + private Kryo kryo; + + @BeforeEach + void setUp() { + FlinkScalaKryoInstantiator instantiator = new FlinkScalaKryoInstantiator(); + kryo = instantiator.newKryo(); + } + + /** Verifies that the primary InstantiatorStrategy is DefaultInstantiatorStrategy. */ + @Test + void testInstantiatorStrategyIsDefaultWithFallback() { + assertThat(kryo.getInstantiatorStrategy()).isInstanceOf(DefaultInstantiatorStrategy.class); + + DefaultInstantiatorStrategy strategy = + (DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy(); + + assertThat(strategy.getFallbackInstantiatorStrategy()) + .isInstanceOf(StdInstantiatorStrategy.class); + } + + /** + * Verifies that a class with a no-arg constructor that initializes fields is properly + * instantiated by Kryo (constructor is called, fields are initialized). + * + *
This test simulates the real-world scenario where Iceberg's SerializableByteBufferMap + * initializes its internal 'wrapped' map in the constructor. Without the fix, the pure + * StdInstantiatorStrategy would bypass the constructor, leaving 'wrapped' as null, causing NPE + * during deserialization when MapSerializer tries to call map.put(). + */ + @Test + void testClassWithNoArgConstructorIsProperlyInitialized() { + // Verify that newInstance invokes the constructor + PojoWithConstructorInit instance = kryo.newInstance(PojoWithConstructorInit.class); + assertThat(instance).isNotNull(); + assertThat(instance.getInternalMap()) + .as( + "The internal map should be initialized by the constructor, " + + "not null (which would happen if the constructor was bypassed)") + .isNotNull(); + assertThat(instance.getInitFlag()) + .as("The init flag should be set to true by the constructor") + .isTrue(); + } + + /** + * Verifies that Kryo serialization round-trip works correctly for a class with a Map field + * initialized in the constructor. + * + *
This is the end-to-end test that reproduces the exact failure scenario: serialize an
+ * object with map entries, then deserialize it. Without the fix, MapSerializer.read() would
+ * call map.put() on a null map, causing NPE.
+ */
+ @Test
+ void testSerializationRoundTripWithMapField() {
+ PojoWithConstructorInit original = new PojoWithConstructorInit();
+ original.getInternalMap().put("key1", "value1");
+ original.getInternalMap().put("key2", "value2");
+
+ // Serialize
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ Output output = new Output(baos);
+ kryo.writeClassAndObject(output, original);
+ output.close();
+
+ // Deserialize
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ Input input = new Input(bais);
+ Object deserialized = kryo.readClassAndObject(input);
+ input.close();
+
+ assertThat(deserialized).isInstanceOf(PojoWithConstructorInit.class);
+ PojoWithConstructorInit result = (PojoWithConstructorInit) deserialized;
+ assertThat(result.getInternalMap()).as("Deserialized map should not be null").isNotNull();
+ assertThat(result.getInternalMap())
+ .as("Deserialized map should contain the same entries")
+ .containsEntry("key1", "value1")
+ .containsEntry("key2", "value2")
+ .hasSize(2);
+ }
+
+ /**
+ * Verifies that classes without a no-arg constructor can still be instantiated via the
+ * Objenesis fallback strategy.
+ */
+ @Test
+ void testClassWithoutNoArgConstructorUsesObjenesisFallback() {
+ PojoWithoutNoArgConstructor instance = kryo.newInstance(PojoWithoutNoArgConstructor.class);
+ // Objenesis bypasses the constructor, so the object is created but fields are
+ // uninitialized.
+ // This is expected and correct behavior for classes without no-arg constructors.
+ assertThat(instance).isNotNull();
+ }
+
+ /**
+ * Verifies that the KryoSerializer (which loads FlinkScalaKryoInstantiator via reflection when
+ * it's on the classpath) produces a Kryo instance with the correct InstantiatorStrategy.
+ */
+ @Test
+ void testKryoSerializerUsesCorrectStrategy() {
+ // When FlinkScalaKryoInstantiator is on the classpath (which it is in this test module),
+ // KryoSerializer.getKryoInstance() will use it to create the Kryo instance.
+ org.apache.flink.api.common.serialization.SerializerConfigImpl config =
+ new org.apache.flink.api.common.serialization.SerializerConfigImpl();
+ org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer