Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/expert_forst_section.html
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
<td>Integer</td>
<td>When the number of eviction that a block in hot link is moved to cold link reaches this value, the block will be blocked from being promoted to the head of the LRU list. The default value is '3'.</td>
</tr>
<tr>
<td><h5>state.backend.forst.checkpoint.transfer.thread.num</h5></td>
<td style="word-wrap: break-word;">4</td>
<td>Integer</td>
<td>The number of transfer threads used to write or copy files to the state backend.</td>
</tr>
<tr>
<td><h5>state.backend.forst.executor.inline-coordinator</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/forst_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@
<td>MemorySize</td>
<td>An upper-bound of the size that can be used for cache. User should specify at least one cache size limit to enable the cache, either this option or the 'state.backend.forst.cache.reserve-size' option. They can be set simultaneously, and in this case, cache will grow if meet the requirements of both two options. The default value is '0 bytes', meaning that this option is disabled. </td>
</tr>
<tr>
<td><h5>state.backend.forst.checkpoint.transfer.thread.num</h5></td>
<td style="word-wrap: break-word;">4</td>
<td>Integer</td>
<td>The number of transfer threads used to write or copy files to the state backend.</td>
</tr>
<tr>
<td><h5>state.backend.forst.executor.inline-coordinator</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ private ForStRestoreOperation getForStRestoreOperation(
long lastCompletedCheckpointId) {
ForStStateDataTransfer stateTransfer =
new ForStStateDataTransfer(
ForStStateDataTransfer.DEFAULT_THREAD_NUM,
optionsContainer.getDataTransferThreadNum(),
optionsContainer.getFileSystem());

if (enableIncrementalCheckpointing) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,4 +300,12 @@ public class ForStOptions {
+ " Only valid when '"
+ EXECUTOR_WRITE_IO_INLINE.key()
+ "' is false.");

@Documentation.Section(Documentation.Sections.EXPERT_FORST)
public static final ConfigOption<Integer> CHECKPOINT_TRANSFER_THREAD_NUM =
Comment thread
francis-a marked this conversation as resolved.
ConfigOptions.key("state.backend.forst.checkpoint.transfer.thread.num")
.intType()
.defaultValue(4)
.withDescription(
"The number of transfer threads used to write or copy files to the state backend.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,10 @@ public int getWriteIoParallelism() {
return configuration.get(ForStOptions.EXECUTOR_WRITE_IO_PARALLELISM);
}

public int getDataTransferThreadNum() {
return configuration.get(ForStOptions.CHECKPOINT_TRANSFER_THREAD_NUM);
}

/**
* Prepare local and remote directories.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@
public class ForStStateDataTransfer implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(ForStStateDataTransfer.class);

// TODO: Add ConfigOption replace this field after ForSt checkpoint implementation stable
public static final int DEFAULT_THREAD_NUM = 4;

protected final ExecutorService executorService;

@Nullable private final ForStFlinkFileSystem forStFs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ public ForStRestoreResult restore() throws Exception {
private void transferAllStateHandles(List<StateHandleTransferSpec> specs) throws Exception {
try (ForStStateDataTransfer transfer =
new ForStStateDataTransfer(
ForStStateDataTransfer.DEFAULT_THREAD_NUM,
optionsContainer.getDataTransferThreadNum(),
optionsContainer.getFileSystem())) {
transfer.transferAllStateDataToDirectory(
optionsContainer.getPathContainer(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ public ForStSyncKeyedStateBackendBuilder<K> setRecoveryClaimMode(

ForStStateDataTransfer stateTransfer =
new ForStStateDataTransfer(
ForStStateDataTransfer.DEFAULT_THREAD_NUM,
optionsContainer.getDataTransferThreadNum(),
optionsContainer.getFileSystem());

if (enableIncrementalCheckpointing) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,20 @@ public void testConfigureQueryTimeAfterNumEntries() throws Exception {
}
}

@Test
public void testConfigureCheckpointTransferThreadNumber() throws Exception {
ForStStateBackend forStStateBackend = new ForStStateBackend();
Configuration configuration = new Configuration();
configuration.setString(
ForStOptions.CHECKPOINT_TRANSFER_THREAD_NUM.key(), "10");
forStStateBackend = forStStateBackend.configure(configuration, getClass().getClassLoader());

try (ForStResourceContainer resourceContainer =
forStStateBackend.createOptionsAndResourceContainer(null)) {
assertEquals(10, resourceContainer.getDataTransferThreadNum());
}
}

private void verifySetParameter(Runnable setter) {
try {
setter.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.runtime.state.v2.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.state.forst.ForStExtension;
import org.apache.flink.state.forst.ForStOperationUtils;
import org.apache.flink.state.forst.ForStOptions;
import org.apache.flink.state.forst.datatransfer.ForStStateDataTransfer;
import org.apache.flink.testutils.junit.utils.TempDirUtils;

Expand Down Expand Up @@ -199,7 +200,7 @@ private ForStIncrementalSnapshotStrategy<?> createSnapshotStrategy()
CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(2),
UUID.randomUUID(),
new TreeMap<>(),
new ForStStateDataTransfer(ForStStateDataTransfer.DEFAULT_THREAD_NUM),
new ForStStateDataTransfer(ForStOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue()),
-1);
}

Expand Down Expand Up @@ -231,7 +232,7 @@ private ForStNativeFullSnapshotStrategy<?> createFullSnapshotStrategy()
new KeyGroupRange(0, 1),
CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(2),
UUID.randomUUID(),
new ForStStateDataTransfer(ForStStateDataTransfer.DEFAULT_THREAD_NUM));
new ForStStateDataTransfer(ForStOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue()));
}

private FsCheckpointStreamFactory createFsCheckpointStreamFactory() throws IOException {
Expand Down