From 49cdc9ff4aaa51c6773e87e50ec01fa713750617 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E8=B4=B5=E5=8F=91?= Date: Wed, 3 Sep 2025 11:32:39 +0800 Subject: [PATCH 1/2] [dinky-admin] Fix issue where a job is still running but Dinky shows its status as failed or unknown --- .../main/java/org/dinky/init/SystemInit.java | 4 + .../java/org/dinky/job/RecheckJobTask.java | 126 ++++++++++++++++++ .../dinky/job/handler/JobRefreshHandler.java | 43 ++++++ .../org/dinky/mapper/JobInstanceMapper.java | 3 + .../org/dinky/service/JobInstanceService.java | 2 + .../service/impl/JobInstanceServiceImpl.java | 5 + .../services/org.dinky.daemon.task.DaemonTask | 3 +- .../resources/mapper/JobInstanceMapper.xml | 13 ++ 8 files changed, 198 insertions(+), 1 deletion(-) create mode 100644 dinky-admin/src/main/java/org/dinky/job/RecheckJobTask.java diff --git a/dinky-admin/src/main/java/org/dinky/init/SystemInit.java b/dinky-admin/src/main/java/org/dinky/init/SystemInit.java index a2c04c7840..e1f8e484dc 100644 --- a/dinky-admin/src/main/java/org/dinky/init/SystemInit.java +++ b/dinky-admin/src/main/java/org/dinky/init/SystemInit.java @@ -36,6 +36,7 @@ import org.dinky.function.pool.UdfCodePool; import org.dinky.job.ClearJobHistoryTask; import org.dinky.job.FlinkJobTask; +import org.dinky.job.RecheckJobTask; import org.dinky.resource.BaseResourceManager; import org.dinky.scheduler.client.ProjectClient; import org.dinky.scheduler.exception.SchedulerException; @@ -160,6 +161,9 @@ private void initDaemon() { DaemonTask clearJobHistoryTask = DaemonTask.build(new DaemonTaskConfig(ClearJobHistoryTask.TYPE)); schedule.addSchedule(clearJobHistoryTask, new PeriodicTrigger(1, TimeUnit.HOURS)); + DaemonTask recheckJobTask = DaemonTask.build(new DaemonTaskConfig(RecheckJobTask.TYPE)); + schedule.addSchedule(recheckJobTask, new PeriodicTrigger(5, TimeUnit.MINUTES)); + // Add flink running job task to flink job thread pool List jobInstances = jobInstanceService.listJobInstanceActive(); FlinkJobThreadPool flinkJobThreadPool = FlinkJobThreadPool.getInstance(); diff --git a/dinky-admin/src/main/java/org/dinky/job/RecheckJobTask.java b/dinky-admin/src/main/java/org/dinky/job/RecheckJobTask.java new file mode 100644 index 0000000000..e7d1c2e2ad --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/job/RecheckJobTask.java @@ -0,0 +1,126 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.job; + +import org.dinky.api.FlinkAPI; +import org.dinky.context.SpringContextUtils; +import org.dinky.daemon.pool.FlinkJobThreadPool; +import org.dinky.daemon.task.DaemonTask; +import org.dinky.daemon.task.DaemonTaskConfig; +import org.dinky.data.enums.JobStatus; +import org.dinky.data.model.ext.JobInfoDetail; +import org.dinky.data.model.job.JobInstance; +import org.dinky.service.JobInstanceService; + +import java.util.List; +import java.util.Optional; + +import org.springframework.context.annotation.DependsOn; + +import com.fasterxml.jackson.databind.JsonNode; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +@DependsOn("springContextUtils") +@Slf4j +@Data +public class RecheckJobTask implements DaemonTask { + + public static final String TYPE = RecheckJobTask.class.toString(); + + private DaemonTaskConfig config; + + private static JobInstanceService jobInstanceService; + + static { + jobInstanceService = SpringContextUtils.getBean("jobInstanceServiceImpl", JobInstanceService.class); + } + + @Override + public DaemonTask setConfig(DaemonTaskConfig config) { + this.config = config; + return this; + } + + @Override + public DaemonTaskConfig getConfig() { + return config; + } + + @Override + public String getType() { + return TYPE; + } + + @Override + public boolean dealTask() { + // Since flink-operator supports task redeployment and automatic recovery of failed jobs, + // we need to recheck the abnormal jobs here. If the job status has recovered to normal, + // put it back into the task monitoring queue. + log.info("Starting recheck of job instances..."); + + List jobInstances = jobInstanceService.listJobInstancesToRecheck(); + log.info("Found {} job instances to recheck", jobInstances.size()); + + FlinkJobThreadPool flinkJobThreadPool = FlinkJobThreadPool.getInstance(); + for (JobInstance jobInstance : jobInstances) { + log.info( + "Rechecking job instance: id={}, name={}, taskId={}", + jobInstance.getId(), + jobInstance.getName(), + jobInstance.getTaskId()); + JobInfoDetail jobInfoDetail = jobInstanceService.getJobInfoDetail(jobInstance.getId()); + Optional newStatus = this.recheckJobInstanceStatus(jobInfoDetail); + log.info("Job '{}' status after recheck: {}", jobInstance.getName(), newStatus.orElse(null)); + if (newStatus.isPresent() && newStatus.get() == JobStatus.RUNNING) { + log.info("Job '{}' is RUNNING again, re-adding to monitoring queue", jobInstance.getName()); + DaemonTaskConfig config = + DaemonTaskConfig.build(FlinkJobTask.TYPE, jobInstance.getId(), jobInstance.getTaskId()); + DaemonTask daemonTask = DaemonTask.build(config); + flinkJobThreadPool.execute(daemonTask); + } + } + log.info("Job recheck completed."); + return true; + } + + private Optional recheckJobInstanceStatus(JobInfoDetail jobInfoDetail) { + try { + String jmHost = jobInfoDetail.getClusterInstance().getJobManagerHost(); + log.info("Querying job status from JobManager host: {}", jmHost); + List jobs = FlinkAPI.build(jmHost).listJobs(); + if (jobs == null || jobs.isEmpty()) { + log.warn("No jobs found on JobManager host: {}", jmHost); + return Optional.empty(); + } + JsonNode firstJob = jobs.get(0); + log.debug("Job response: {}", firstJob.toString()); + String newStatus = firstJob.get("state").asText(); + log.info("Fetched job state: {}", newStatus); + return Optional.of(JobStatus.valueOf(newStatus)); + } catch (Exception e) { + log.warn( + "Failed to fetch job status, task: {}", + jobInfoDetail.getInstance().getName()); + return Optional.empty(); + } + } +} diff --git a/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java b/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java index 348c88cdf1..541a3b331a 100644 --- a/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java +++ b/dinky-admin/src/main/java/org/dinky/job/handler/JobRefreshHandler.java @@ -124,6 +124,8 @@ public static boolean refreshJob(JobInfoDetail jobInfoDetail, boolean needSave) checkAndRefreshCluster(jobInfoDetail); + checkAndRefreshJobId(jobInfoDetail); + // Update the value of JobData from the flink api while ignoring the null value to prevent // some other configuration from being overwritten BeanUtil.copyProperties( @@ -225,6 +227,47 @@ public static boolean refreshJob(JobInfoDetail jobInfoDetail, boolean needSave) return isDone; } + /** + * In Flink operator mode, resource scaling triggers a job redeployment, which results in a new job ID. + * The system will update to the latest job ID accordingly. + * + * + * @param jobInfoDetail The job info detail. + */ + public static void checkAndRefreshJobId(JobInfoDetail jobInfoDetail) { + if (!GatewayType.get(jobInfoDetail.getClusterInstance().getType()).isKubernetesApplicationMode()) { + return; + } + + List jobs = FlinkAPI.build(jobInfoDetail.getClusterInstance().getJobManagerHost()) + .listJobs(); + if (jobs == null || jobs.isEmpty()) { + log.info( + "No running jobs found on task: {}", + jobInfoDetail.getClusterInstance().getJobManagerHost()); + return; + } + + JsonNode firstJob = jobs.stream().findFirst().orElse(jobs.get(0)); + String latestJobId = firstJob.get("jid").asText(); + String currentJobId = jobInfoDetail.getInstance().getJid(); + if (!latestJobId.equals(currentJobId)) { + JobInstance jobInstance = jobInfoDetail.getInstance(); + jobInstance.setJid(latestJobId); + jobInstanceService.updateById(jobInstance); + log.info( + "JobId for [{}] has been refreshed: {} -> {}", + jobInfoDetail.getInstance().getName(), + currentJobId, + latestJobId); + } else { + log.debug( + "JobId for [{}] is up to date: {}", + jobInfoDetail.getInstance().getName(), + currentJobId); + } + } + /** * Retrieves job history. * getJobStatusInformationFromFlinkRestAPI diff --git a/dinky-admin/src/main/java/org/dinky/mapper/JobInstanceMapper.java b/dinky-admin/src/main/java/org/dinky/mapper/JobInstanceMapper.java index 55fd072a3e..3274275821 100644 --- a/dinky-admin/src/main/java/org/dinky/mapper/JobInstanceMapper.java +++ b/dinky-admin/src/main/java/org/dinky/mapper/JobInstanceMapper.java @@ -49,6 +49,9 @@ public interface JobInstanceMapper extends SuperMapper { @InterceptorIgnore(tenantLine = "true") List listJobInstanceActive(); + @InterceptorIgnore(tenantLine = "true") + List listJobInstancesToRecheck(); + JobInstance getJobInstanceByTaskId(Integer id); @InterceptorIgnore(tenantLine = "true") diff --git a/dinky-admin/src/main/java/org/dinky/service/JobInstanceService.java b/dinky-admin/src/main/java/org/dinky/service/JobInstanceService.java index 4993fbccac..b893192874 100644 --- a/dinky-admin/src/main/java/org/dinky/service/JobInstanceService.java +++ b/dinky-admin/src/main/java/org/dinky/service/JobInstanceService.java @@ -61,6 +61,8 @@ public interface JobInstanceService extends ISuperService { */ List listJobInstanceActive(); + List listJobInstancesToRecheck(); + /** * Get the job information detail for the given ID. * diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java index 236942209e..2e21ca4673 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/JobInstanceServiceImpl.java @@ -148,6 +148,11 @@ public List listJobInstanceActive() { return baseMapper.listJobInstanceActive(); } + @Override + public List listJobInstancesToRecheck() { + return baseMapper.listJobInstancesToRecheck(); + } + @Override public JobInfoDetail getJobInfoDetail(Integer id) { if (Asserts.isNull(TenantContextHolder.get())) { diff --git a/dinky-admin/src/main/resources/META-INF/services/org.dinky.daemon.task.DaemonTask b/dinky-admin/src/main/resources/META-INF/services/org.dinky.daemon.task.DaemonTask index 1392030df9..076fe61188 100644 --- a/dinky-admin/src/main/resources/META-INF/services/org.dinky.daemon.task.DaemonTask +++ b/dinky-admin/src/main/resources/META-INF/services/org.dinky.daemon.task.DaemonTask @@ -1,3 +1,4 @@ org.dinky.job.FlinkJobTask org.dinky.job.SystemMetricsTask -org.dinky.job.ClearJobHistoryTask \ No newline at end of file +org.dinky.job.ClearJobHistoryTask +org.dinky.job.RecheckJobTask \ No newline at end of file diff --git a/dinky-admin/src/main/resources/mapper/JobInstanceMapper.xml b/dinky-admin/src/main/resources/mapper/JobInstanceMapper.xml index a5ee9ed2dd..2658f8db2e 100644 --- a/dinky-admin/src/main/resources/mapper/JobInstanceMapper.xml +++ b/dinky-admin/src/main/resources/mapper/JobInstanceMapper.xml @@ -91,6 +91,19 @@ order by id desc + +