diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java index 793c8f3e9b387..ec9417d2dfddf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java @@ -122,6 +122,21 @@ public SnapshotResultSupplier asyncSnapshot( Map> registeredBroadcastStatesDeepCopies = syncPartResource.getRegisteredBroadcastStatesDeepCopies(); + if (registeredBroadcastStatesDeepCopies.isEmpty() + && hasOnlyEmptyOperatorListStates(registeredOperatorStatesDeepCopies)) { + if (streamFactory instanceof FsMergingCheckpointStorageLocation) { + FsMergingCheckpointStorageLocation location = + (FsMergingCheckpointStorageLocation) streamFactory; + return snapshotCloseableRegistry -> + SnapshotResult.of( + EmptyFileMergingOperatorStreamStateHandle.create( + location.getExclusiveStateHandle(), + location.getSharedStateHandle())); + } else { + return snapshotCloseableRegistry -> SnapshotResult.empty(); + } + } + if (registeredBroadcastStatesDeepCopies.isEmpty() && registeredOperatorStatesDeepCopies.isEmpty()) { if (streamFactory instanceof FsMergingCheckpointStorageLocation) { @@ -236,6 +251,20 @@ public SnapshotResultSupplier asyncSnapshot( }; } + private boolean hasOnlyEmptyOperatorListStates( + Map> registeredOperatorStatesDeepCopies) { + if (registeredOperatorStatesDeepCopies.isEmpty()) { + return false; + } + + for (PartitionableListState listState : registeredOperatorStatesDeepCopies.values()) { + if (listState == null || listState.get().iterator().hasNext()) { + return false; + } + } + return true; + } + static class DefaultOperatorStateBackendSnapshotResources implements SnapshotResources { private final Map> registeredOperatorStatesDeepCopies; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java index 3d684426e7a74..9a72e33670498 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java @@ -44,6 +44,7 @@ import org.apache.flink.runtime.checkpoint.filemerging.FileMergingType; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.state.filemerging.EmptyFileMergingOperatorStreamStateHandle; import org.apache.flink.runtime.state.filemerging.FileMergingOperatorStreamStateHandle; import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess; import org.apache.flink.runtime.state.filesystem.FsMergingCheckpointStorageLocation; @@ -489,6 +490,124 @@ void testFileMergingSnapshotEmpty(@TempDir File tmpFolder) throws Exception { assertThat(stateHandle).isInstanceOf(FileMergingOperatorStreamStateHandle.class); } + @Test + void testSnapshotWithEmptyRegisteredOperatorState() throws Exception { + final AbstractStateBackend abstractStateBackend = new HashMapStateBackend(); + CloseableRegistry cancelStreamRegistry = new CloseableRegistry(); + + Environment env = createMockEnvironment(); + final OperatorStateBackend operatorStateBackend = + abstractStateBackend.createOperatorStateBackend( + new OperatorStateBackendParametersImpl( + env, "testOperator", emptyStateHandles, cancelStreamRegistry)); + + ListStateDescriptor stateDescriptor = + new ListStateDescriptor<>("test-empty-list-state", IntSerializer.INSTANCE); + try { + ListState listState = operatorStateBackend.getListState(stateDescriptor); + assertThat(listState.get()).isEmpty(); + + CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4096); + + RunnableFuture> snapshot = + operatorStateBackend.snapshot( + 0L, + 0L, + streamFactory, + CheckpointOptions.forCheckpointWithDefaultLocation()); + + SnapshotResult snapshotResult = + FutureUtils.runIfNotDoneAndGet(snapshot); + OperatorStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot(); + assertThat(stateHandle).isNull(); + } finally { + operatorStateBackend.close(); + operatorStateBackend.dispose(); + } + } + + @Test + void testFileMergingSnapshotRestoreWithEmptyRegisteredUnionState(@TempDir File tmpFolder) + throws Exception { + final AbstractStateBackend abstractStateBackend = new HashMapStateBackend(); + CloseableRegistry cancelStreamRegistry = new CloseableRegistry(); + + Environment env = createMockEnvironment(); + OperatorStateBackend operatorStateBackend = + abstractStateBackend.createOperatorStateBackend( + new OperatorStateBackendParametersImpl( + env, "testOperator", emptyStateHandles, cancelStreamRegistry)); + + ListStateDescriptor stateDescriptor = + new ListStateDescriptor<>("test-empty-union-state", IntSerializer.INSTANCE); + ListState unionState = operatorStateBackend.getUnionListState(stateDescriptor); + assertThat(unionState.get()).isEmpty(); + + Path checkpointBaseDir = new Path(tmpFolder.toString()); + Path sharedStateDir = + new Path( + checkpointBaseDir, + AbstractFsCheckpointStorageAccess.CHECKPOINT_SHARED_STATE_DIR); + Path taskOwnedStateDir = + new Path( + checkpointBaseDir, + AbstractFsCheckpointStorageAccess.CHECKPOINT_TASK_OWNED_STATE_DIR); + + JobID jobId = JobID.generate(); + final FileMergingSnapshotManager.SubtaskKey subtaskKey = + new FileMergingSnapshotManager.SubtaskKey(jobId.toHexString(), "opId", 1, 1); + LocalFileSystem fs = getSharedInstance(); + CheckpointStorageLocationReference cslReference = + AbstractFsCheckpointStorageAccess.encodePathAsReference( + fromLocalFile(fs.pathToFile(checkpointBaseDir))); + FileMergingSnapshotManager snapshotManager = + createFileMergingSnapshotManager( + checkpointBaseDir, sharedStateDir, taskOwnedStateDir, subtaskKey); + CheckpointStreamFactory streamFactory = + new FsMergingCheckpointStorageLocation( + subtaskKey, + fs, + checkpointBaseDir, + sharedStateDir, + taskOwnedStateDir, + cslReference, + 1024, + 1024, + snapshotManager, + 0); + + OperatorStateHandle stateHandle = null; + try { + RunnableFuture> snapshot = + operatorStateBackend.snapshot( + 0L, + 0L, + streamFactory, + CheckpointOptions.forCheckpointWithDefaultLocation()); + + SnapshotResult snapshotResult = + FutureUtils.runIfNotDoneAndGet(snapshot); + stateHandle = snapshotResult.getJobManagerOwnedSnapshot(); + + assertThat(stateHandle).isInstanceOf(EmptyFileMergingOperatorStreamStateHandle.class); + + operatorStateBackend = + recreateOperatorStateBackend( + operatorStateBackend, + abstractStateBackend, + StateObjectCollection.singleton(stateHandle)); + + unionState = operatorStateBackend.getUnionListState(stateDescriptor); + assertThat(unionState.get()).isEmpty(); + } finally { + operatorStateBackend.close(); + operatorStateBackend.dispose(); + if (stateHandle != null) { + stateHandle.discardState(); + } + } + } + @Test void testSnapshotBroadcastStateWithEmptyOperatorState() throws Exception { final AbstractStateBackend abstractStateBackend = new HashMapStateBackend();