diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java index 492615f402..d8993992df 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java @@ -48,6 +48,7 @@ import java.util.SortedMap; import static org.apache.flink.autoscaler.JobVertexScaler.KeyGroupOrPartitionsAdjustMode.MAXIMIZE_UTILISATION; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.ALL_MAXIMIZE_UTILISATION_USE_MAX_PARALLELISM; import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR; import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR; import static org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_ENABLED; @@ -548,6 +549,9 @@ protected static > int scale( KeyGroupOrPartitionsAdjustMode mode = context.getConfiguration().get(SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE); + var allowMaximizeUtilisationUseMaxParallelism = + context.getConfiguration().get(ALL_MAXIMIZE_UTILISATION_USE_MAX_PARALLELISM); + // When the shuffle type of vertex inputs contains keyBy or vertex is a source, // we try to adjust the parallelism such that it divides // the numKeyGroupsOrPartitions without a remainder => data is evenly spread across subtasks @@ -557,8 +561,10 @@ protected static > int scale( // When Mode is MAXIMIZE_UTILISATION , Try to find the smallest parallelism // that can satisfy the current consumption rate. (mode == MAXIMIZE_UTILISATION - && numKeyGroupsOrPartitions / p - < numKeyGroupsOrPartitions / newParallelism)) { + && (numKeyGroupsOrPartitions / p + < numKeyGroupsOrPartitions / newParallelism + || (allowMaximizeUtilisationUseMaxParallelism + && newParallelism == maxParallelism)))) { return p; } } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java index a67bfd505c..6dd29ce970 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java @@ -383,6 +383,15 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "How to adjust the parallelism of Source vertex or upstream shuffle is keyBy"); + public static final ConfigOption ALL_MAXIMIZE_UTILISATION_USE_MAX_PARALLELISM = + autoScalerConfig("maximize-utilization.use-max-parallelism") + .booleanType() + .defaultValue(false) + .withFallbackKeys( + oldOperatorConfigKey("maximize-utilization.use-max-parallelism")) + .withDescription( + "When use maximize-utilization allow use maximum parallelism for all operators."); + public static final ConfigOption OBSERVED_SCALABILITY_ENABLED = autoScalerConfig("observed-scalability.enabled") .booleanType()