Skip to content

Commit a4ba7a5

Browse files
committed
[FLINK-39291][API / Type Serialization System] FlinkScalaKryoInstantiator InstantiatorStrategy fix
1 parent df4f8c4 commit a4ba7a5

File tree

2 files changed

+223
-1
lines changed

2 files changed

+223
-1
lines changed

flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/runtime/types/FlinkScalaKryoInstantiator.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,15 @@ class FlinkScalaKryoInstantiator {
5959
def newKryo: Kryo = {
6060
val k = new Kryo
6161
k.setRegistrationRequired(false)
62-
k.setInstantiatorStrategy(new org.objenesis.strategy.StdInstantiatorStrategy)
62+
// Use DefaultInstantiatorStrategy as the primary strategy (which tries no-arg constructors
63+
// first via reflection), and only falls back to StdInstantiatorStrategy (Objenesis, which
64+
// bypasses constructors entirely) when no suitable constructor is found.
65+
// Previously this was set to pure StdInstantiatorStrategy, which bypasses all constructors
66+
// and can leave fields uninitialized (e.g., causing NPE in classes like Iceberg's
67+
// SerializableByteBufferMap that rely on constructor initialization).
68+
val initStrategy = new com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy()
69+
initStrategy.setFallbackInstantiatorStrategy(new org.objenesis.strategy.StdInstantiatorStrategy)
70+
k.setInstantiatorStrategy(initStrategy)
6371
k.register(classOf[Unit], new VoidSerializer)
6472
// The wrappers are private classes:
6573
useFieldSerializer(k, List(1, 2, 3).asJava.getClass)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.api.runtime.types;
20+
21+
import com.esotericsoftware.kryo.Kryo;
22+
import com.esotericsoftware.kryo.io.Input;
23+
import com.esotericsoftware.kryo.io.Output;
24+
import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy;
25+
import org.junit.jupiter.api.BeforeEach;
26+
import org.junit.jupiter.api.Test;
27+
import org.objenesis.strategy.StdInstantiatorStrategy;
28+
29+
import java.io.ByteArrayInputStream;
30+
import java.io.ByteArrayOutputStream;
31+
import java.io.Serializable;
32+
import java.util.HashMap;
33+
import java.util.Map;
34+
35+
import static org.assertj.core.api.Assertions.assertThat;
36+
37+
/**
38+
* Tests for {@link FlinkScalaKryoInstantiator}.
39+
*
40+
* <p>Verifies that the Kryo instance created by {@link FlinkScalaKryoInstantiator} uses {@link
41+
* DefaultInstantiatorStrategy} as the primary strategy (which invokes no-arg constructors via
42+
* reflection), with {@link StdInstantiatorStrategy} as a fallback (which uses Objenesis to bypass
43+
* constructors).
44+
*
45+
* <p>This is critical because using a pure {@link StdInstantiatorStrategy} bypasses all
46+
* constructors, leaving fields uninitialized (null) for classes that rely on constructor
47+
* initialization (e.g., Iceberg's SerializableByteBufferMap).
48+
*/
49+
class FlinkScalaKryoInstantiatorTest {
50+
51+
private Kryo kryo;
52+
53+
@BeforeEach
54+
void setUp() {
55+
FlinkScalaKryoInstantiator instantiator = new FlinkScalaKryoInstantiator();
56+
kryo = instantiator.newKryo();
57+
}
58+
59+
/** Verifies that the primary InstantiatorStrategy is DefaultInstantiatorStrategy. */
60+
@Test
61+
void testInstantiatorStrategyIsDefaultWithFallback() {
62+
assertThat(kryo.getInstantiatorStrategy()).isInstanceOf(DefaultInstantiatorStrategy.class);
63+
64+
DefaultInstantiatorStrategy strategy =
65+
(DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy();
66+
67+
assertThat(strategy.getFallbackInstantiatorStrategy())
68+
.isInstanceOf(StdInstantiatorStrategy.class);
69+
}
70+
71+
/**
72+
* Verifies that a class with a no-arg constructor that initializes fields is properly
73+
* instantiated by Kryo (constructor is called, fields are initialized).
74+
*
75+
* <p>This test simulates the real-world scenario where Iceberg's SerializableByteBufferMap
76+
* initializes its internal 'wrapped' map in the constructor. Without the fix, the pure
77+
* StdInstantiatorStrategy would bypass the constructor, leaving 'wrapped' as null, causing NPE
78+
* during deserialization when MapSerializer tries to call map.put().
79+
*/
80+
@Test
81+
void testClassWithNoArgConstructorIsProperlyInitialized() {
82+
// Verify that newInstance invokes the constructor
83+
PojoWithConstructorInit instance = kryo.newInstance(PojoWithConstructorInit.class);
84+
assertThat(instance).isNotNull();
85+
assertThat(instance.getInternalMap())
86+
.as(
87+
"The internal map should be initialized by the constructor, "
88+
+ "not null (which would happen if the constructor was bypassed)")
89+
.isNotNull();
90+
assertThat(instance.getInitFlag())
91+
.as("The init flag should be set to true by the constructor")
92+
.isTrue();
93+
}
94+
95+
/**
96+
* Verifies that Kryo serialization round-trip works correctly for a class with a Map field
97+
* initialized in the constructor.
98+
*
99+
* <p>This is the end-to-end test that reproduces the exact failure scenario: serialize an
100+
* object with map entries, then deserialize it. Without the fix, MapSerializer.read() would
101+
* call map.put() on a null map, causing NPE.
102+
*/
103+
@Test
104+
void testSerializationRoundTripWithMapField() {
105+
PojoWithConstructorInit original = new PojoWithConstructorInit();
106+
original.getInternalMap().put("key1", "value1");
107+
original.getInternalMap().put("key2", "value2");
108+
109+
// Serialize
110+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
111+
Output output = new Output(baos);
112+
kryo.writeClassAndObject(output, original);
113+
output.close();
114+
115+
// Deserialize
116+
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
117+
Input input = new Input(bais);
118+
Object deserialized = kryo.readClassAndObject(input);
119+
input.close();
120+
121+
assertThat(deserialized).isInstanceOf(PojoWithConstructorInit.class);
122+
PojoWithConstructorInit result = (PojoWithConstructorInit) deserialized;
123+
assertThat(result.getInternalMap()).as("Deserialized map should not be null").isNotNull();
124+
assertThat(result.getInternalMap())
125+
.as("Deserialized map should contain the same entries")
126+
.containsEntry("key1", "value1")
127+
.containsEntry("key2", "value2")
128+
.hasSize(2);
129+
}
130+
131+
/**
132+
* Verifies that classes without a no-arg constructor can still be instantiated via the
133+
* Objenesis fallback strategy.
134+
*/
135+
@Test
136+
void testClassWithoutNoArgConstructorUsesObjenesisFallback() {
137+
PojoWithoutNoArgConstructor instance = kryo.newInstance(PojoWithoutNoArgConstructor.class);
138+
// Objenesis bypasses the constructor, so the object is created but fields are
139+
// uninitialized.
140+
// This is expected and correct behavior for classes without no-arg constructors.
141+
assertThat(instance).isNotNull();
142+
}
143+
144+
/**
145+
* Verifies that the KryoSerializer (which loads FlinkScalaKryoInstantiator via reflection when
146+
* it's on the classpath) produces a Kryo instance with the correct InstantiatorStrategy.
147+
*/
148+
@Test
149+
void testKryoSerializerUsesCorrectStrategy() {
150+
// When FlinkScalaKryoInstantiator is on the classpath (which it is in this test module),
151+
// KryoSerializer.getKryoInstance() will use it to create the Kryo instance.
152+
org.apache.flink.api.common.serialization.SerializerConfigImpl config =
153+
new org.apache.flink.api.common.serialization.SerializerConfigImpl();
154+
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer<PojoWithConstructorInit>
155+
kryoSerializer =
156+
new org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer<>(
157+
PojoWithConstructorInit.class, config);
158+
Kryo kryoFromSerializer = kryoSerializer.getKryo();
159+
160+
assertThat(kryoFromSerializer.getInstantiatorStrategy())
161+
.isInstanceOf(DefaultInstantiatorStrategy.class);
162+
163+
DefaultInstantiatorStrategy strategy =
164+
(DefaultInstantiatorStrategy) kryoFromSerializer.getInstantiatorStrategy();
165+
assertThat(strategy.getFallbackInstantiatorStrategy())
166+
.isInstanceOf(StdInstantiatorStrategy.class);
167+
}
168+
169+
// -------------------------------------------------------------------------
170+
// Test helper classes
171+
// -------------------------------------------------------------------------
172+
173+
/**
174+
* A simple POJO that initializes an internal Map in its no-arg constructor. This simulates
175+
* classes like Iceberg's SerializableByteBufferMap.
176+
*/
177+
public static class PojoWithConstructorInit implements Serializable {
178+
private static final long serialVersionUID = 1L;
179+
180+
private final Map<String, String> internalMap;
181+
private final boolean initFlag;
182+
183+
public PojoWithConstructorInit() {
184+
this.internalMap = new HashMap<>();
185+
this.initFlag = true;
186+
}
187+
188+
public Map<String, String> getInternalMap() {
189+
return internalMap;
190+
}
191+
192+
public boolean getInitFlag() {
193+
return initFlag;
194+
}
195+
}
196+
197+
/**
198+
* A class without a no-arg constructor. Kryo should fall back to Objenesis
199+
* (StdInstantiatorStrategy) for this class.
200+
*/
201+
public static class PojoWithoutNoArgConstructor implements Serializable {
202+
private static final long serialVersionUID = 1L;
203+
204+
private final String value;
205+
206+
public PojoWithoutNoArgConstructor(String value) {
207+
this.value = value;
208+
}
209+
210+
public String getValue() {
211+
return value;
212+
}
213+
}
214+
}

0 commit comments

Comments
 (0)