diff --git a/flink-core-api/src/main/java/org/apache/flink/api/common/state/MapState.java b/flink-core-api/src/main/java/org/apache/flink/api/common/state/MapState.java index 739788a3241fb..24054298fdbe4 100644 --- a/flink-core-api/src/main/java/org/apache/flink/api/common/state/MapState.java +++ b/flink-core-api/src/main/java/org/apache/flink/api/common/state/MapState.java @@ -35,9 +35,6 @@ * the current element. That way, the system can handle stream and state partitioning consistently * together. * - *

The user value could be null, but change log state backend is not compatible with the user - * value is null, see FLINK-38144 for more details. - * * @param Type of the keys in the state. * @param Type of the values in the state. */ diff --git a/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/MapState.java b/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/MapState.java index d698f7f9dc8d6..3f71e99cd7ed0 100644 --- a/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/MapState.java +++ b/flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/MapState.java @@ -35,9 +35,6 @@ * the current element. That way, the system can handle stream and state partitioning consistently * together. * - *

The user value could be null, but change log state backend is not compatible with the user - * value is null, see FLINK-38144 for more details. - * * @param Type of the keys in the state. * @param Type of the values in the state. */ diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java index 0beebacc9e3c8..9a5e1a1efe4f1 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java @@ -33,6 +33,7 @@ import org.apache.flink.util.function.ThrowingConsumer; import java.io.IOException; +import java.util.Collections; import java.util.Iterator; import java.util.Map; import java.util.Objects; @@ -77,8 +78,12 @@ public UV getValue() { public UV setValue(UV value) { UV oldValue = entry.setValue(value); try { - changeLogger.valueElementAddedOrUpdated( - getWriter(entry.getKey(), entry.getValue()), ns); + if (value == null) { + changeLogger.valueAdded(Collections.singletonMap(entry.getKey(), null), ns); + } else { + changeLogger.valueElementAddedOrUpdated( + getWriter(entry.getKey(), entry.getValue()), ns); + } } catch (IOException e) { ExceptionUtils.rethrow(e); } @@ -105,7 +110,11 @@ public UV get(UK key) throws Exception { @Override public void put(UK key, UV value) throws Exception { delegatedState.put(key, value); - changeLogger.valueElementAddedOrUpdated(getWriter(key, value), getCurrentNamespace()); + if (value == null) { + changeLogger.valueAdded(Collections.singletonMap(key, null), getCurrentNamespace()); + } else { + changeLogger.valueElementAddedOrUpdated(getWriter(key, value), getCurrentNamespace()); + } } @Override diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java index bc15a0fa81b23..495b452d8bdb2 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateEmbeddedRocksDBStateBackendTest.java @@ -139,7 +139,6 @@ protected boolean checkMetrics() { // Follow https://issues.apache.org/jira/browse/FLINK-38144 @Override @TestTemplate - @Disabled("Currently, ChangelogStateBackend does not support null values for map state") public void testMapStateWithNullValue() throws Exception { super.testMapStateWithNullValue(); } diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateHashMapTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateHashMapTest.java index 8dda3deccf6a1..b20e6ad3e92f2 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateHashMapTest.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateHashMapTest.java @@ -32,7 +32,6 @@ import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.testutils.junit.utils.TempDirUtils; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.io.TempDir; @@ -115,7 +114,6 @@ public void testMaterializedRestorePriorityQueue() throws Exception { // Follow https://issues.apache.org/jira/browse/FLINK-38144 @Override @TestTemplate - @Disabled("Currently, ChangelogStateBackend does not support null values for map state") public void testMapStateWithNullValue() throws Exception { super.testMapStateWithNullValue(); }