diff --git a/docs/layouts/shortcodes/generated/expert_forst_section.html b/docs/layouts/shortcodes/generated/expert_forst_section.html index 81ec199945840..f9b7acec173b0 100644 --- a/docs/layouts/shortcodes/generated/expert_forst_section.html +++ b/docs/layouts/shortcodes/generated/expert_forst_section.html @@ -20,6 +20,12 @@ Integer When the number of eviction that a block in hot link is moved to cold link reaches this value, the block will be blocked from being promoted to the head of the LRU list. The default value is '3'. + +
state.backend.forst.checkpoint.transfer.thread.num
+ 4 + Integer + The number of transfer threads used to write or copy files to the state backend. +
state.backend.forst.executor.inline-coordinator
false diff --git a/docs/layouts/shortcodes/generated/forst_configuration.html b/docs/layouts/shortcodes/generated/forst_configuration.html index f66c1af4d8b3b..c1ce466a793a4 100644 --- a/docs/layouts/shortcodes/generated/forst_configuration.html +++ b/docs/layouts/shortcodes/generated/forst_configuration.html @@ -38,6 +38,12 @@ MemorySize An upper-bound of the size that can be used for cache. User should specify at least one cache size limit to enable the cache, either this option or the 'state.backend.forst.cache.reserve-size' option. They can be set simultaneously, and in this case, cache will grow if meet the requirements of both two options. The default value is '0 bytes', meaning that this option is disabled. + +
state.backend.forst.checkpoint.transfer.thread.num
+ 4 + Integer + The number of transfer threads used to write or copy files to the state backend. +
state.backend.forst.executor.inline-coordinator
false diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java index d9b2bc3dd58c4..68aa391871275 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java @@ -442,7 +442,7 @@ private ForStRestoreOperation getForStRestoreOperation( long lastCompletedCheckpointId) { ForStStateDataTransfer stateTransfer = new ForStStateDataTransfer( - ForStStateDataTransfer.DEFAULT_THREAD_NUM, + optionsContainer.getDataTransferThreadNum(), optionsContainer.getFileSystem()); if (enableIncrementalCheckpointing) { diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java index 8a816e3e1b13e..b11fcd4d49276 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptions.java @@ -300,4 +300,12 @@ public class ForStOptions { + " Only valid when '" + EXECUTOR_WRITE_IO_INLINE.key() + "' is false."); + + @Documentation.Section(Documentation.Sections.EXPERT_FORST) + public static final ConfigOption CHECKPOINT_TRANSFER_THREAD_NUM = + ConfigOptions.key("state.backend.forst.checkpoint.transfer.thread.num") + .intType() + .defaultValue(4) + .withDescription( + "The number of transfer threads used to write or copy files to the state backend."); } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java index cb29cf1857cb1..6d7b225af2372 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java @@ -324,6 +324,10 @@ public int getWriteIoParallelism() { return configuration.get(ForStOptions.EXECUTOR_WRITE_IO_PARALLELISM); } + public int getDataTransferThreadNum() { + return configuration.get(ForStOptions.CHECKPOINT_TRANSFER_THREAD_NUM); + } + /** * Prepare local and remote directories. * diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/ForStStateDataTransfer.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/ForStStateDataTransfer.java index 0305f8cd02e9a..852fa08405e4b 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/ForStStateDataTransfer.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/ForStStateDataTransfer.java @@ -64,9 +64,6 @@ public class ForStStateDataTransfer implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(ForStStateDataTransfer.class); - // TODO: Add ConfigOption replace this field after ForSt checkpoint implementation stable - public static final int DEFAULT_THREAD_NUM = 4; - protected final ExecutorService executorService; @Nullable private final ForStFlinkFileSystem forStFs; diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java index f205fe21b9f0d..ca3d683a48410 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java @@ -265,7 +265,7 @@ public ForStRestoreResult restore() throws Exception { private void transferAllStateHandles(List specs) throws Exception { try (ForStStateDataTransfer transfer = new ForStStateDataTransfer( - ForStStateDataTransfer.DEFAULT_THREAD_NUM, + optionsContainer.getDataTransferThreadNum(), optionsContainer.getFileSystem())) { transfer.transferAllStateDataToDirectory( optionsContainer.getPathContainer(), diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java index 0e83b22a0ced2..0f6dcac1e7367 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java @@ -465,7 +465,7 @@ public ForStSyncKeyedStateBackendBuilder setRecoveryClaimMode( ForStStateDataTransfer stateTransfer = new ForStStateDataTransfer( - ForStStateDataTransfer.DEFAULT_THREAD_NUM, + optionsContainer.getDataTransferThreadNum(), optionsContainer.getFileSystem()); if (enableIncrementalCheckpointing) { diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java index 624bbf0b577db..2a5eb8c9dcf9c 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java @@ -831,6 +831,20 @@ public void testConfigureQueryTimeAfterNumEntries() throws Exception { } } + @Test + public void testConfigureCheckpointTransferThreadNumber() throws Exception { + ForStStateBackend forStStateBackend = new ForStStateBackend(); + Configuration configuration = new Configuration(); + configuration.setString( + ForStOptions.CHECKPOINT_TRANSFER_THREAD_NUM.key(), "10"); + forStStateBackend = forStStateBackend.configure(configuration, getClass().getClassLoader()); + + try (ForStResourceContainer resourceContainer = + forStStateBackend.createOptionsAndResourceContainer(null)) { + assertEquals(10, resourceContainer.getDataTransferThreadNum()); + } + } + private void verifySetParameter(Runnable setter) { try { setter.run(); diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java index 034df702554ac..e0e738b4540fb 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/snapshot/ForStIncrementalSnapshotStrategyTest.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.state.v2.RegisteredKeyValueStateBackendMetaInfo; import org.apache.flink.state.forst.ForStExtension; import org.apache.flink.state.forst.ForStOperationUtils; +import org.apache.flink.state.forst.ForStOptions; import org.apache.flink.state.forst.datatransfer.ForStStateDataTransfer; import org.apache.flink.testutils.junit.utils.TempDirUtils; @@ -199,7 +200,7 @@ private ForStIncrementalSnapshotStrategy createSnapshotStrategy() CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(2), UUID.randomUUID(), new TreeMap<>(), - new ForStStateDataTransfer(ForStStateDataTransfer.DEFAULT_THREAD_NUM), + new ForStStateDataTransfer(ForStOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue()), -1); } @@ -231,7 +232,7 @@ private ForStNativeFullSnapshotStrategy createFullSnapshotStrategy() new KeyGroupRange(0, 1), CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(2), UUID.randomUUID(), - new ForStStateDataTransfer(ForStStateDataTransfer.DEFAULT_THREAD_NUM)); + new ForStStateDataTransfer(ForStOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue())); } private FsCheckpointStreamFactory createFsCheckpointStreamFactory() throws IOException {