From d7cd8804acd04dd62e1bc58002147df4a45f9d1b Mon Sep 17 00:00:00 2001 From: nishita-pattanayak Date: Thu, 24 Jul 2025 00:49:03 +0530 Subject: [PATCH 1/3] [FLINK-35746][Kubernetes-Operator] Add getJobConfiguration and getJobCheckpointConfiguration --- .../operator/config/FlinkConfigManager.java | 228 +++++++++++++ .../controller/FlinkDeploymentContext.java | 4 +- .../controller/FlinkSessionJobContext.java | 4 +- .../service/AbstractFlinkService.java | 311 +++++++++++++++++- .../operator/service/FlinkService.java | 22 ++ 5 files changed, 566 insertions(+), 3 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java index 8a2fd2651d..2b387c09a4 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java @@ -19,17 +19,24 @@ package org.apache.flink.kubernetes.operator.config; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.autoscaler.config.AutoScalerOptions; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot; import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec; import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec; import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion; +import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; +import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.kubernetes.operator.utils.EnvUtils; import org.apache.flink.kubernetes.operator.utils.FlinkUtils; @@ -41,7 +48,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import io.fabric8.kubernetes.api.model.ObjectMeta; +import lombok.AllArgsConstructor; import lombok.Builder; +import lombok.Data; import lombok.SneakyThrows; import lombok.Value; import org.apache.commons.lang3.ObjectUtils; @@ -50,6 +59,7 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -89,6 +99,20 @@ public class FlinkConfigManager { private final Consumer> namespaceListener; private volatile ConcurrentHashMap> relevantFlinkVersionPrefixes; + // Job-state-aware cache for runtime configuration overrides + private final Cache> runtimeConfigCache; + + /** Cache key for runtime configuration overrides. */ + @Data + @Builder + @AllArgsConstructor + private static class RuntimeConfigCacheKey { + private final String namespace; + private final String name; + private final String jobId; + private final String updateTime; + } + protected static final Pattern FLINK_VERSION_PATTERN = Pattern.compile( VERSION_CONF_PREFIX.replaceAll("\\.", "\\\\\\.") @@ -133,6 +157,18 @@ public Configuration load(Key k) { } }); + // Initialize runtime configuration cache with similar timeout + this.runtimeConfigCache = + CacheBuilder.newBuilder() + .maximumSize( + defaultConfig.get( + KubernetesOperatorConfigOptions + .OPERATOR_CONFIG_CACHE_SIZE)) // Reasonable limit + // for runtime config + // cache + .expireAfterAccess(cacheTimeout) + .build(); + updateDefaultConfig(defaultConfig); ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); executorService.scheduleWithFixedDelay( @@ -141,6 +177,13 @@ public Configuration load(Key k) { cacheTimeout.toMillis(), TimeUnit.MILLISECONDS); + // Also clean up runtime config cache + executorService.scheduleWithFixedDelay( + runtimeConfigCache::cleanUp, + cacheTimeout.toMillis(), + cacheTimeout.toMillis(), + TimeUnit.MILLISECONDS); + if (defaultConfig.getBoolean(OPERATOR_DYNAMIC_CONFIG_ENABLED)) { scheduleConfigWatcher(executorService); } @@ -352,8 +395,193 @@ public Configuration getObserveConfig(FlinkDeployment deployment) { return conf; } + /** + * Get the observe configuration enhanced with runtime configuration overrides fetched from the + * job. This method fetches the actual runtime configuration from the job via REST API and + * overrides the deployment spec configuration with the actual values. + * + * @param deployment Deployment resource + * @param flinkService FlinkService to use for fetching runtime configuration + * @return Observe config enhanced with runtime configuration overrides + */ + public Configuration getObserveConfigWithRuntimeOverrides( + FlinkDeployment deployment, FlinkService flinkService) { + // Get the base observe configuration + Configuration conf = getObserveConfig(deployment); + + // Apply runtime configuration overrides if job is running + applyRuntimeConfigurationOverrides(deployment, flinkService, conf); + + return conf; + } + + /** + * Get configuration for interacting with session jobs enhanced with runtime configuration + * overrides. This method fetches the actual runtime configuration from the job via REST API and + * overrides the session job spec configuration with the actual values. + * + * @param sessionJob Session job resource + * @param flinkService FlinkService to use for fetching runtime configuration + * @return Session job config enhanced with runtime configuration overrides + */ + public Configuration getSessionJobConfigWithRuntimeOverrides( + FlinkSessionJob sessionJob, FlinkService flinkService, Configuration conf) { + // Apply runtime configuration overrides if job is running + applyRuntimeConfigurationOverrides(sessionJob, flinkService, conf); + + return conf; + } + + /** + * Generic method to apply runtime configuration overrides from any Flink resource to the + * provided configuration. This method fetches the actual runtime configuration from the running + * job and overrides the resource spec configuration with the actual values to ensure the + * operator works with the correct configuration values. + * + *

Uses a job-state-aware cache to ensure runtime configuration is only fetched once per job + * when it reaches RUNNING state. + * + * @param resource Flink resource (FlinkDeployment or FlinkSessionJob) + * @param flinkService FlinkService to use for fetching runtime configuration + * @param conf Configuration to apply overrides to + */ + private > void applyRuntimeConfigurationOverrides( + T resource, FlinkService flinkService, Configuration conf) { + try { + // Check if job is running and has a job ID + var jobStatus = resource.getStatus().getJobStatus(); + if (jobStatus == null || jobStatus.getJobId() == null) { + LOG.debug("No job ID available for runtime configuration fetch"); + return; + } + + JobID jobId = JobID.fromHexString(jobStatus.getJobId()); + JobStatus jobState = jobStatus.getState(); + + // Only fetch runtime config if job is in a running state + if (jobState == null || !jobState.equals(JobStatus.RUNNING)) { + LOG.debug( + "Job {} is not in RUNNING state (current: {}), skipping runtime config fetch", + jobId, + jobState); + return; + } + + // Check if JobManager is ready (only for FlinkDeployment) + if (resource instanceof FlinkDeployment) { + FlinkDeployment deployment = (FlinkDeployment) resource; + if (deployment.getStatus().getJobManagerDeploymentStatus() + != JobManagerDeploymentStatus.READY) { + LOG.debug( + "JobManager is not ready, skipping runtime config fetch for job {}", + jobId); + return; + } + } + + // Create cache key based on job state to ensure we only fetch once per job per state + RuntimeConfigCacheKey cacheKey = + RuntimeConfigCacheKey.builder() + .namespace(resource.getMetadata().getNamespace()) + .name(resource.getMetadata().getName()) + .jobId(jobStatus.getJobId()) + .updateTime(jobStatus.getUpdateTime()) + .build(); + + // Check if runtime config is already cached for this job state + Map runtimeConfig = runtimeConfigCache.getIfPresent(cacheKey); + + if (runtimeConfig == null) { + LOG.debug( + "Runtime configuration not cached for job {}, fetching from REST API", + jobId); + + // Fetch both job configuration and checkpoint configuration + runtimeConfig = new HashMap<>(); + + // 1. Fetch job configuration + try { + Map jobConfig = flinkService.getJobConfiguration(conf, jobId); + if (jobConfig != null) { + runtimeConfig.putAll(jobConfig); + LOG.debug( + "Fetched {} job configuration entries for job {}", + jobConfig.size(), + jobId); + LOG.debug("Fetched job configuration: {}", jobConfig); + } else { + LOG.debug("Empty job configuration for job {}", jobId); + } + } catch (Exception e) { + LOG.warn( + "Failed to fetch job configuration for job {} via REST API: {}", + jobId, + e.getMessage()); + LOG.debug("Job configuration fetch exception details", e); + } + + // 2. Fetch checkpoint configuration and convert to config map + try { + Map checkpointConfig = + flinkService.getJobCheckpointConfiguration(conf, jobId); + if (checkpointConfig != null) { + runtimeConfig.putAll(checkpointConfig); + LOG.debug( + "Fetched {} checkpoint configuration entries for job {}", + checkpointConfig.size(), + jobId); + } + } catch (Exception e) { + LOG.warn( + "Failed to fetch checkpoint configuration for job {} via REST API: {}", + jobId, + e.getMessage()); + LOG.debug("Checkpoint configuration fetch exception details", e); + } + + // Cache the combined runtime configuration to avoid repeated fetches + runtimeConfigCache.put(cacheKey, runtimeConfig); + LOG.debug( + "Cached runtime configuration for job {} (cache key: {})", jobId, cacheKey); + } else { + LOG.debug("Using cached runtime configuration for job {}", jobId); + } + + // Apply the fetched runtime configuration to the main configuration object + // This overrides the deployment spec configuration with actual runtime values + if (runtimeConfig != null && !runtimeConfig.isEmpty()) { + LOG.info( + "[VALIDATION] Before override, parallelism.default = {}", + conf.get(CoreOptions.DEFAULT_PARALLELISM)); + LOG.info( + "Applying {} runtime configuration overrides for job {}", + runtimeConfig.size(), + jobId); + runtimeConfig.forEach(conf::setString); + LOG.info( + "[VALIDATION] After override, parallelism.default = {}", + conf.get(CoreOptions.DEFAULT_PARALLELISM)); + } else { + LOG.debug("No runtime configuration available for job {}", jobId); + } + + } catch (Exception e) { + // Don't fail observe config creation if runtime config fetch fails + // Fall back to deployment spec configuration + LOG.warn( + "Failed to fetch runtime configuration for {} {}, " + + "using deployment spec configuration instead. This may cause " + + "operator decisions to be based on incorrect configuration values.", + resource.getClass().getSimpleName(), + resource.getMetadata().getName(), + e); + } + } + private void addOperatorConfigsFromSpec(AbstractFlinkSpec spec, Configuration conf) { + // Observe config should include the latest operator related settings + if (spec.getFlinkConfiguration() != null) { spec.getFlinkConfiguration() .forEach( diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentContext.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentContext.java index 2944217836..2cc1bd569a 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentContext.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentContext.java @@ -59,7 +59,9 @@ public Configuration getDeployConfig(AbstractFlinkSpec spec) { @Override protected Configuration createObserveConfig() { - return configManager.getObserveConfig(getResource()); + // Use enhanced observe config that fetches runtime configuration from the job + // and overrides deployment spec configuration with actual runtime values + return configManager.getObserveConfigWithRuntimeOverrides(getResource(), getFlinkService()); } @Override diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobContext.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobContext.java index f0256b31f4..506224f494 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobContext.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobContext.java @@ -77,7 +77,9 @@ protected FlinkService createFlinkService() { @Override protected Configuration createObserveConfig() { - return getDeployConfig(getResource().getSpec()); + Configuration conf = getDeployConfig(getResource().getSpec()); + return configManager.getSessionJobConfigWithRuntimeOverrides( + getResource(), getFlinkService(), conf); } @Override diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index 211f2e9dc4..440c7b27c5 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -66,10 +66,14 @@ import org.apache.flink.runtime.rest.messages.DashboardConfiguration; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobConfigHeaders; +import org.apache.flink.runtime.rest.messages.JobConfigInfo; import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders; import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; import org.apache.flink.runtime.rest.messages.TriggerId; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointIdPathParameter; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointInfo; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders; @@ -170,6 +174,14 @@ public abstract class AbstractFlinkService implements FlinkService { public static final String FIELD_NAME_TOTAL_CPU = "total-cpu"; public static final String FIELD_NAME_TOTAL_MEMORY = "total-memory"; public static final String FIELD_NAME_STATE_SIZE = "state-size"; + private static final com.fasterxml.jackson.databind.ObjectMapper objectMapper = + new com.fasterxml.jackson.databind.ObjectMapper(); + + static { + objectMapper.setVisibility( + com.fasterxml.jackson.annotation.PropertyAccessor.FIELD, + com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.ANY); + } protected final KubernetesClient kubernetesClient; protected final ExecutorService executorService; @@ -567,7 +579,7 @@ public Optional getLastCheckpoint(JobID jobId, Configuration conf) { .getExternalPointer() .equals(NonPersistentMetadataCheckpointStorageLocation.EXTERNAL_POINTER)) { throw new UpgradeFailureException( - "Latest checkpoint not externally addressable, manual recovery required.", + "Latest checkpoint not externally addressable, Manual restore required.", "CheckpointNotFound"); } return latestCheckpointOpt.map( @@ -870,6 +882,303 @@ public JobExceptionsInfoWithHistory getJobExceptions( } } + @Override + public Map getJobConfiguration(Configuration conf, JobID jobId) + throws Exception { + LOG.debug("Fetching job configuration for job {}", jobId); + try (var clusterClient = getClusterClient(conf)) { + Map jobConfig = new HashMap<>(); + + // Use JobConfigHeaders to get job configuration directly + var jobConfigHeaders = JobConfigHeaders.getInstance(); + var parameters = new JobMessageParameters(); + parameters.jobPathParameter.resolve(jobId); + + try { + JobConfigInfo configurationInfo = + clusterClient + .sendRequest( + jobConfigHeaders, + parameters, + EmptyRequestBody.getInstance()) + .get( + operatorConfig.getFlinkClientTimeout().toSeconds(), + TimeUnit.SECONDS); + LOG.debug("Job Config Info: {}", configurationInfo); + + // Extract configuration from the response + if (configurationInfo != null) { + LOG.debug("Job Configuration Info: {}", configurationInfo); + if (configurationInfo.getExecutionConfigInfo() != null) { + jobConfig.put( + "parallelism.default", + String.valueOf( + configurationInfo + .getExecutionConfigInfo() + .getParallelism())); + // Correct this as restart-strategy is incorect + // if (!Objects.equals( + // + // configurationInfo.getExecutionConfigInfo().getRestartStrategy(), + // "Cluster level default restart strategy")) + // { + // jobConfig.put( + // "restart-strategy", + // configurationInfo + // .getExecutionConfigInfo() + // .getRestartStrategy()); + // } + jobConfig.put( + "pipeline.object-reuse", + String.valueOf( + configurationInfo + .getExecutionConfigInfo() + .isObjectReuse())); + jobConfig.putAll( + configurationInfo + .getExecutionConfigInfo() + .getGlobalJobParameters()); + } + + LOG.debug("Fetched {} job configuration entries", jobConfig.size()); + } else { + LOG.warn("Job configuration is null for job {}", jobId); + } + + } catch (Exception e) { + LOG.warn( + "Failed to fetch job configuration for job {} via REST API: {}", + jobId, + e.getMessage()); + LOG.debug("Job configuration fetch exception details", e); + } + + LOG.debug( + "Fetched {} configuration entries for job {}: {}", + jobConfig.size(), + jobId, + jobConfig); + return jobConfig; + } + } + + @Override + public Map getJobCheckpointConfiguration(Configuration conf, JobID jobId) + throws Exception { + LOG.debug("Fetching checkpoint configuration for job {}", jobId); + try (var clusterClient = getClusterClient(conf)) { + var checkpointConfigHeaders = CheckpointConfigHeaders.getInstance(); + var parameters = new JobMessageParameters(); + parameters.jobPathParameter.resolve(jobId); + + try { + var checkpointConfigInfo = + clusterClient + .sendRequest( + checkpointConfigHeaders, + parameters, + EmptyRequestBody.getInstance()) + .get( + operatorConfig.getFlinkClientTimeout().toSeconds(), + TimeUnit.SECONDS); + + // Convert the REST API response to a Map to handle nested objects + Map rawResponse = + objectMapper.convertValue( + checkpointConfigInfo, + new com.fasterxml.jackson.core.type.TypeReference< + Map>() {}); + + LOG.debug( + "Raw checkpoint configuration response for job {}: {}", jobId, rawResponse); + + // Map the JSON response fields to correct Flink configuration keys + Map mappedConfig = new HashMap<>(); + + // Map checkpoint mode + if (rawResponse.containsKey("processingMode")) { + String mode = String.valueOf(rawResponse.get("processingMode")); + if ("exactly_once".equals(mode)) { + mappedConfig.put("execution.checkpointing.mode", "EXACTLY_ONCE"); + } else if ("at_least_once".equals(mode)) { + mappedConfig.put("execution.checkpointing.mode", "AT_LEAST_ONCE"); + } + } + + // Map checkpoint interval (from milliseconds to duration string) + if (rawResponse.containsKey("checkpointInterval")) { + String intervalMs = String.valueOf(rawResponse.get("checkpointInterval")); + mappedConfig.put("execution.checkpointing.interval", intervalMs + "ms"); + } + + // Map checkpoint timeout (from milliseconds to duration string) + if (rawResponse.containsKey("checkpointTimeout")) { + String timeoutMs = String.valueOf(rawResponse.get("checkpointTimeout")); + mappedConfig.put("execution.checkpointing.timeout", timeoutMs + "ms"); + } + + // Map min pause between checkpoints (from milliseconds to duration string) + if (rawResponse.containsKey("minPauseBetweenCheckpoints")) { + String minPauseMs = + String.valueOf(rawResponse.get("minPauseBetweenCheckpoints")); + mappedConfig.put("execution.checkpointing.min-pause", minPauseMs + "ms"); + } + + // Map max concurrent checkpoints + if (rawResponse.containsKey("maxConcurrentCheckpoints")) { + mappedConfig.put( + "execution.checkpointing.max-concurrent-checkpoints", + String.valueOf(rawResponse.get("maxConcurrentCheckpoints"))); + } + + // Map tolerable failed checkpoints + if (rawResponse.containsKey("tolerableFailedCheckpoints")) { + mappedConfig.put( + "execution.checkpointing.tolerable-failed-checkpoints", + String.valueOf(rawResponse.get("tolerableFailedCheckpoints"))); + } + + // Map externalized checkpoint retention + if (rawResponse.containsKey("externalizedCheckpointInfo")) { + Object externalizationObj = rawResponse.get("externalizedCheckpointInfo"); + if (externalizationObj instanceof Map) { + @SuppressWarnings("unchecked") + Map externalization = + (Map) externalizationObj; + Boolean enabled = (Boolean) externalization.get("enabled"); + Boolean deleteOnCancellation = + (Boolean) externalization.get("deleteOnCancellation"); + + if (Boolean.TRUE.equals(enabled)) { + if (Boolean.TRUE.equals(deleteOnCancellation)) { + mappedConfig.put( + "execution.checkpointing.externalized-checkpoint-retention", + "DELETE_ON_CANCELLATION"); + } else { + mappedConfig.put( + "execution.checkpointing.externalized-checkpoint-retention", + "RETAIN_ON_CANCELLATION"); + } + } else { + mappedConfig.put( + "execution.checkpointing.externalized-checkpoint-retention", + "NO_EXTERNALIZED_CHECKPOINTS"); + } + } + } + + // Map state backend + if (rawResponse.containsKey("stateBackend")) { + String stateBackend = String.valueOf(rawResponse.get("stateBackend")); + if ("EmbeddedRocksDBStateBackend".equals(stateBackend)) { + mappedConfig.put("state.backend.type", "rocksdb"); + } else if ("HashMapStateBackend".equals(stateBackend)) { + mappedConfig.put("state.backend.type", "hashmap"); + } else { + mappedConfig.put("state.backend.type", stateBackend.toLowerCase()); + } + } + + // Map checkpoint storage + if (rawResponse.containsKey("checkpointStorage")) { + String checkpointStorage = String.valueOf(rawResponse.get("checkpointStorage")); + if ("FileSystemCheckpointStorage".equals(checkpointStorage)) { + mappedConfig.put("execution.checkpointing.storage", "filesystem"); + } else if ("JobManagerCheckpointStorage".equals(checkpointStorage)) { + mappedConfig.put("execution.checkpointing.storage", "jobmanager"); + } else { + mappedConfig.put( + "execution.checkpointing.storage", checkpointStorage.toLowerCase()); + } + } + + // Map unaligned checkpoints + if (rawResponse.containsKey("unalignedCheckpoints")) { + mappedConfig.put( + "execution.checkpointing.unaligned.enabled", + String.valueOf(rawResponse.get("unalignedCheckpoints"))); + } + + // Map aligned checkpoint timeout (from milliseconds to duration string) + if (rawResponse.containsKey("alignedCheckpointTimeout")) { + String alignedTimeoutMs = + String.valueOf(rawResponse.get("alignedCheckpointTimeout")); + mappedConfig.put( + "execution.checkpointing.aligned-checkpoint-timeout", + alignedTimeoutMs + "ms"); + } + + // Map checkpoints after tasks finish + if (rawResponse.containsKey("checkpointsWithFinishedTasks")) { + mappedConfig.put( + "execution.checkpointing.checkpoints-after-tasks-finish", + String.valueOf(rawResponse.get("checkpointsWithFinishedTasks"))); + } + + // Map state changelog settings + if (rawResponse.containsKey("stateChangelog")) { + mappedConfig.put( + "state.changelog.enabled", + String.valueOf(rawResponse.get("stateChangelog"))); + } + + if (rawResponse.containsKey("periodicMaterializationInterval")) { + String intervalMs = + String.valueOf(rawResponse.get("periodicMaterializationInterval")); + mappedConfig.put( + "state.changelog.periodic-materialize.interval", intervalMs + "ms"); + } + + if (rawResponse.containsKey("changelogStorage")) { + mappedConfig.put( + "state.changelog.storage", + String.valueOf(rawResponse.get("changelogStorage"))); + } + + LOG.debug( + "Mapped {} checkpoint configuration entries for job {}: {}", + mappedConfig.size(), + jobId, + mappedConfig); + + return mappedConfig; + + } catch (Exception e) { + LOG.warn( + "Failed to fetch checkpoint configuration for job {} via REST API, falling back to job config logic.", + jobId, + e); + + if (e.getMessage() != null && e.getMessage().contains("404")) { + throw new RuntimeException(e); + } + // We fallback to this logic, because the checkpoint config query is not available + // int pre Flink 1.15 + return getJobConfigFromRest(clusterClient, jobId).entrySet().stream() + .filter( + entry -> + entry.getKey().startsWith("execution.checkpointing.") + || entry.getKey() + .startsWith("state.backend.changelog")) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + } + } + + private Map getJobConfigFromRest( + RestClusterClient clusterClient, JobID jobId) throws Exception { + var jobConfigHeaders = JobConfigHeaders.getInstance(); + var parameters = new JobMessageParameters(); + parameters.jobPathParameter.resolve(jobId); + var jobConfigInfo = + clusterClient + .sendRequest(jobConfigHeaders, parameters, EmptyRequestBody.getInstance()) + .get(operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS); + return objectMapper.convertValue( + jobConfigInfo, + new com.fasterxml.jackson.core.type.TypeReference>() {}); + } + @VisibleForTesting protected void runJar( JobSpec job, diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java index b1a078fbb2..d13fc06830 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java @@ -133,6 +133,28 @@ JobExceptionsInfoWithHistory getJobExceptions( AbstractFlinkResource resource, JobID jobId, Configuration observeConfig) throws Exception; + /** + * Fetches the actual runtime configuration of a job from Flink REST API. This is used to get + * the real configuration after programmatic overrides. + * + * @param conf Configuration for REST client + * @param jobId Job ID to fetch configuration for + * @return Map of configuration key-value pairs + * @throws Exception if REST call fails + */ + Map getJobConfiguration(Configuration conf, JobID jobId) throws Exception; + + /** + * Fetches the checkpoint configuration of a job from Flink REST API. + * + * @param conf Configuration for REST client + * @param jobId Job ID to fetch checkpoint configuration for + * @return CheckpointConfigInfo containing checkpoint configuration + * @throws Exception if REST call fails + */ + Map getJobCheckpointConfiguration(Configuration conf, JobID jobId) + throws Exception; + /** Result of a cancel operation. */ @AllArgsConstructor class CancelResult { From e87421cad5fd5ecc43b1cd5b49540b96fb93800c Mon Sep 17 00:00:00 2001 From: nishita-pattanayak Date: Thu, 24 Jul 2025 01:33:38 +0530 Subject: [PATCH 2/3] [FLINK-35746][Kubernetes-Operator] Add getJobConfiguration and getJobCheckpointConfiguration --- .../service/AbstractFlinkService.java | 303 +++++++++--------- 1 file changed, 149 insertions(+), 154 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index 440c7b27c5..d79964ebbc 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -149,6 +149,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -962,6 +963,32 @@ public Map getJobConfiguration(Configuration conf, JobID jobId) } } + // Constants for checkpoint configuration field mappings + private static final Map CHECKPOINT_FIELD_MAPPINGS = + Map.of( + "processingMode", "execution.checkpointing.mode", + "checkpointInterval", "execution.checkpointing.interval", + "checkpointTimeout", "execution.checkpointing.timeout", + "minPauseBetweenCheckpoints", "execution.checkpointing.min-pause", + "maxConcurrentCheckpoints", + "execution.checkpointing.max-concurrent-checkpoints", + "tolerableFailedCheckpoints", + "execution.checkpointing.tolerable-failed-checkpoints", + "unalignedCheckpoints", "execution.checkpointing.unaligned.enabled", + "alignedCheckpointTimeout", + "execution.checkpointing.aligned-checkpoint-timeout", + "checkpointsWithFinishedTasks", + "execution.checkpointing.checkpoints-after-tasks-finish", + "stateChangelog", "state.changelog.enabled"); + + private static final Set DURATION_FIELDS = + Set.of( + "checkpointInterval", + "checkpointTimeout", + "minPauseBetweenCheckpoints", + "alignedCheckpointTimeout", + "periodicMaterializationInterval"); + @Override public Map getJobCheckpointConfiguration(Configuration conf, JobID jobId) throws Exception { @@ -982,7 +1009,7 @@ public Map getJobCheckpointConfiguration(Configuration conf, Job operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS); - // Convert the REST API response to a Map to handle nested objects + // Convert response to handle nested objects Map rawResponse = objectMapper.convertValue( checkpointConfigInfo, @@ -992,176 +1019,144 @@ public Map getJobCheckpointConfiguration(Configuration conf, Job LOG.debug( "Raw checkpoint configuration response for job {}: {}", jobId, rawResponse); - // Map the JSON response fields to correct Flink configuration keys - Map mappedConfig = new HashMap<>(); + return mapCheckpointConfiguration(rawResponse, jobId); - // Map checkpoint mode - if (rawResponse.containsKey("processingMode")) { - String mode = String.valueOf(rawResponse.get("processingMode")); - if ("exactly_once".equals(mode)) { - mappedConfig.put("execution.checkpointing.mode", "EXACTLY_ONCE"); - } else if ("at_least_once".equals(mode)) { - mappedConfig.put("execution.checkpointing.mode", "AT_LEAST_ONCE"); - } - } - - // Map checkpoint interval (from milliseconds to duration string) - if (rawResponse.containsKey("checkpointInterval")) { - String intervalMs = String.valueOf(rawResponse.get("checkpointInterval")); - mappedConfig.put("execution.checkpointing.interval", intervalMs + "ms"); - } + } catch (Exception e) { + LOG.warn( + "Failed to fetch checkpoint configuration for job {} via REST API, falling back to job config logic.", + jobId, + e); - // Map checkpoint timeout (from milliseconds to duration string) - if (rawResponse.containsKey("checkpointTimeout")) { - String timeoutMs = String.valueOf(rawResponse.get("checkpointTimeout")); - mappedConfig.put("execution.checkpointing.timeout", timeoutMs + "ms"); + if (e.getCause() instanceof RestClientException) { + RestClientException restException = (RestClientException) e.getCause(); + if (restException.getHttpResponseStatus() == HttpResponseStatus.NOT_FOUND) { + throw new RuntimeException("Job not found: " + jobId, e); + } } - // Map min pause between checkpoints (from milliseconds to duration string) - if (rawResponse.containsKey("minPauseBetweenCheckpoints")) { - String minPauseMs = - String.valueOf(rawResponse.get("minPauseBetweenCheckpoints")); - mappedConfig.put("execution.checkpointing.min-pause", minPauseMs + "ms"); - } + // Fallback for pre-Flink 1.15 versions + return getJobConfigFromRest(clusterClient, jobId).entrySet().stream() + .filter( + entry -> + entry.getKey().startsWith("execution.checkpointing.") + || entry.getKey() + .startsWith("state.backend.changelog")) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + } + } - // Map max concurrent checkpoints - if (rawResponse.containsKey("maxConcurrentCheckpoints")) { - mappedConfig.put( - "execution.checkpointing.max-concurrent-checkpoints", - String.valueOf(rawResponse.get("maxConcurrentCheckpoints"))); - } + private Map mapCheckpointConfiguration( + Map rawResponse, JobID jobId) { + Map mappedConfig = new HashMap<>(); - // Map tolerable failed checkpoints - if (rawResponse.containsKey("tolerableFailedCheckpoints")) { - mappedConfig.put( - "execution.checkpointing.tolerable-failed-checkpoints", - String.valueOf(rawResponse.get("tolerableFailedCheckpoints"))); - } + // Handle simple field mappings + CHECKPOINT_FIELD_MAPPINGS.forEach( + (jsonField, configKey) -> { + if (rawResponse.containsKey(jsonField)) { + String value = String.valueOf(rawResponse.get(jsonField)); - // Map externalized checkpoint retention - if (rawResponse.containsKey("externalizedCheckpointInfo")) { - Object externalizationObj = rawResponse.get("externalizedCheckpointInfo"); - if (externalizationObj instanceof Map) { - @SuppressWarnings("unchecked") - Map externalization = - (Map) externalizationObj; - Boolean enabled = (Boolean) externalization.get("enabled"); - Boolean deleteOnCancellation = - (Boolean) externalization.get("deleteOnCancellation"); - - if (Boolean.TRUE.equals(enabled)) { - if (Boolean.TRUE.equals(deleteOnCancellation)) { - mappedConfig.put( - "execution.checkpointing.externalized-checkpoint-retention", - "DELETE_ON_CANCELLATION"); - } else { - mappedConfig.put( - "execution.checkpointing.externalized-checkpoint-retention", - "RETAIN_ON_CANCELLATION"); - } - } else { - mappedConfig.put( - "execution.checkpointing.externalized-checkpoint-retention", - "NO_EXTERNALIZED_CHECKPOINTS"); + // Special handling for processing mode + if ("processingMode".equals(jsonField)) { + value = mapProcessingMode(value); } - } - } - // Map state backend - if (rawResponse.containsKey("stateBackend")) { - String stateBackend = String.valueOf(rawResponse.get("stateBackend")); - if ("EmbeddedRocksDBStateBackend".equals(stateBackend)) { - mappedConfig.put("state.backend.type", "rocksdb"); - } else if ("HashMapStateBackend".equals(stateBackend)) { - mappedConfig.put("state.backend.type", "hashmap"); - } else { - mappedConfig.put("state.backend.type", stateBackend.toLowerCase()); - } - } + // Add duration suffix for time-based fields + if (DURATION_FIELDS.contains(jsonField)) { + value += "ms"; + } - // Map checkpoint storage - if (rawResponse.containsKey("checkpointStorage")) { - String checkpointStorage = String.valueOf(rawResponse.get("checkpointStorage")); - if ("FileSystemCheckpointStorage".equals(checkpointStorage)) { - mappedConfig.put("execution.checkpointing.storage", "filesystem"); - } else if ("JobManagerCheckpointStorage".equals(checkpointStorage)) { - mappedConfig.put("execution.checkpointing.storage", "jobmanager"); - } else { - mappedConfig.put( - "execution.checkpointing.storage", checkpointStorage.toLowerCase()); + mappedConfig.put(configKey, value); } - } - - // Map unaligned checkpoints - if (rawResponse.containsKey("unalignedCheckpoints")) { - mappedConfig.put( - "execution.checkpointing.unaligned.enabled", - String.valueOf(rawResponse.get("unalignedCheckpoints"))); - } - - // Map aligned checkpoint timeout (from milliseconds to duration string) - if (rawResponse.containsKey("alignedCheckpointTimeout")) { - String alignedTimeoutMs = - String.valueOf(rawResponse.get("alignedCheckpointTimeout")); - mappedConfig.put( - "execution.checkpointing.aligned-checkpoint-timeout", - alignedTimeoutMs + "ms"); - } - - // Map checkpoints after tasks finish - if (rawResponse.containsKey("checkpointsWithFinishedTasks")) { - mappedConfig.put( - "execution.checkpointing.checkpoints-after-tasks-finish", - String.valueOf(rawResponse.get("checkpointsWithFinishedTasks"))); - } - - // Map state changelog settings - if (rawResponse.containsKey("stateChangelog")) { - mappedConfig.put( - "state.changelog.enabled", - String.valueOf(rawResponse.get("stateChangelog"))); - } + }); + + // Handle complex nested mappings + mapExternalizedCheckpointInfo(rawResponse, mappedConfig); + mapStateBackend(rawResponse, mappedConfig); + mapCheckpointStorage(rawResponse, mappedConfig); + mapStateChangelog(rawResponse, mappedConfig); + + LOG.debug( + "Mapped {} checkpoint configuration entries for job {}: {}", + mappedConfig.size(), + jobId, + mappedConfig); + return mappedConfig; + } - if (rawResponse.containsKey("periodicMaterializationInterval")) { - String intervalMs = - String.valueOf(rawResponse.get("periodicMaterializationInterval")); - mappedConfig.put( - "state.changelog.periodic-materialize.interval", intervalMs + "ms"); - } + private String mapProcessingMode(String mode) { + if ("exactly_once".equals(mode)) { + return "EXACTLY_ONCE"; + } else if ("at_least_once".equals(mode)) { + return "AT_LEAST_ONCE"; + } else { + return mode.toUpperCase(); + } + } - if (rawResponse.containsKey("changelogStorage")) { - mappedConfig.put( - "state.changelog.storage", - String.valueOf(rawResponse.get("changelogStorage"))); - } + private void mapExternalizedCheckpointInfo( + Map rawResponse, Map mappedConfig) { + Object externalizationObj = rawResponse.get("externalizedCheckpointInfo"); + if (externalizationObj instanceof Map) { + @SuppressWarnings("unchecked") + Map externalization = (Map) externalizationObj; + Boolean enabled = (Boolean) externalization.get("enabled"); + Boolean deleteOnCancellation = (Boolean) externalization.get("deleteOnCancellation"); + + String retention = "NO_EXTERNALIZED_CHECKPOINTS"; + if (Boolean.TRUE.equals(enabled)) { + retention = + Boolean.TRUE.equals(deleteOnCancellation) + ? "DELETE_ON_CANCELLATION" + : "RETAIN_ON_CANCELLATION"; + } + mappedConfig.put( + "execution.checkpointing.externalized-checkpoint-retention", retention); + } + } - LOG.debug( - "Mapped {} checkpoint configuration entries for job {}: {}", - mappedConfig.size(), - jobId, - mappedConfig); + private void mapStateBackend( + Map rawResponse, Map mappedConfig) { + if (rawResponse.containsKey("stateBackend")) { + String stateBackend = String.valueOf(rawResponse.get("stateBackend")); + String backendType; + if ("EmbeddedRocksDBStateBackend".equals(stateBackend)) { + backendType = "rocksdb"; + } else if ("HashMapStateBackend".equals(stateBackend)) { + backendType = "hashmap"; + } else { + backendType = stateBackend.toLowerCase(); + } + mappedConfig.put("state.backend.type", backendType); + } + } - return mappedConfig; + private void mapCheckpointStorage( + Map rawResponse, Map mappedConfig) { + if (rawResponse.containsKey("checkpointStorage")) { + String checkpointStorage = String.valueOf(rawResponse.get("checkpointStorage")); + String storageType; + if ("FileSystemCheckpointStorage".equals(checkpointStorage)) { + storageType = "filesystem"; + } else if ("JobManagerCheckpointStorage".equals(checkpointStorage)) { + storageType = "jobmanager"; + } else { + storageType = checkpointStorage.toLowerCase(); + } + mappedConfig.put("execution.checkpointing.storage", storageType); + } + } - } catch (Exception e) { - LOG.warn( - "Failed to fetch checkpoint configuration for job {} via REST API, falling back to job config logic.", - jobId, - e); + private void mapStateChangelog( + Map rawResponse, Map mappedConfig) { + if (rawResponse.containsKey("periodicMaterializationInterval")) { + String intervalMs = String.valueOf(rawResponse.get("periodicMaterializationInterval")); + mappedConfig.put("state.changelog.periodic-materialize.interval", intervalMs + "ms"); + } - if (e.getMessage() != null && e.getMessage().contains("404")) { - throw new RuntimeException(e); - } - // We fallback to this logic, because the checkpoint config query is not available - // int pre Flink 1.15 - return getJobConfigFromRest(clusterClient, jobId).entrySet().stream() - .filter( - entry -> - entry.getKey().startsWith("execution.checkpointing.") - || entry.getKey() - .startsWith("state.backend.changelog")) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - } + if (rawResponse.containsKey("changelogStorage")) { + mappedConfig.put( + "state.changelog.storage", String.valueOf(rawResponse.get("changelogStorage"))); } } From 40e1cd7c858573c6e1045681246848a255bdebca Mon Sep 17 00:00:00 2001 From: nishita-09 <74376090+nishita-09@users.noreply.github.com> Date: Sun, 27 Jul 2025 16:18:28 +0530 Subject: [PATCH 3/3] Update FlinkConfigManager.java --- .../flink/kubernetes/operator/config/FlinkConfigManager.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java index 2b387c09a4..7ccf1b69a6 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java @@ -163,9 +163,7 @@ public Configuration load(Key k) { .maximumSize( defaultConfig.get( KubernetesOperatorConfigOptions - .OPERATOR_CONFIG_CACHE_SIZE)) // Reasonable limit - // for runtime config - // cache + .OPERATOR_CONFIG_CACHE_SIZE)) .expireAfterAccess(cacheTimeout) .build();