diff --git a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/constants/CollectorStatus.java b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/constants/CollectorStatus.java new file mode 100644 index 00000000000..8d0fd444c32 --- /dev/null +++ b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/constants/CollectorStatus.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.common.constants; + +/** + * Enum representing the possible statuses of a collector. + */ +public enum CollectorStatus { + ONLINE, OFFLINE; +} diff --git a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/Manager.java b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/Manager.java index addb281ce7a..7451fd44e67 100644 --- a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/Manager.java +++ b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/Manager.java @@ -18,6 +18,8 @@ package org.apache.hertzbeat.manager; import javax.annotation.PostConstruct; + +import org.apache.hertzbeat.common.constants.ConfigConstants; import org.apache.hertzbeat.manager.nativex.HertzbeatRuntimeHintsRegistrar; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -36,10 +38,10 @@ @SpringBootApplication @EnableJpaAuditing -@EnableJpaRepositories(basePackages = {"org.apache.hertzbeat"}) -@EntityScan(basePackages = {"org.apache.hertzbeat"}) -@ComponentScan(basePackages = {"org.apache.hertzbeat"}) -@ConfigurationPropertiesScan(basePackages = {"org.apache.hertzbeat"}) +@EnableJpaRepositories(basePackages = {ConfigConstants.PkgConstant.PKG}) +@EntityScan(basePackages = {ConfigConstants.PkgConstant.PKG}) +@ComponentScan(basePackages = {ConfigConstants.PkgConstant.PKG}) +@ConfigurationPropertiesScan(basePackages = {ConfigConstants.PkgConstant.PKG}) @ImportRuntimeHints(HertzbeatRuntimeHintsRegistrar.class) @EnableAsync @EnableScheduling diff --git a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/component/status/CalculateStatus.java b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/component/status/CalculateStatus.java index 94a0d7ca0cf..ffea03f0a52 100644 --- a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/component/status/CalculateStatus.java +++ b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/component/status/CalculateStatus.java @@ -40,7 +40,7 @@ import org.apache.hertzbeat.common.entity.manager.StatusPageComponent; import org.apache.hertzbeat.common.entity.manager.StatusPageHistory; import org.apache.hertzbeat.common.entity.manager.StatusPageOrg; -import org.apache.hertzbeat.manager.config.StatusProperties; +import org.apache.hertzbeat.manager.properties.StatusProperties; import org.apache.hertzbeat.manager.dao.MonitorDao; import org.apache.hertzbeat.manager.dao.StatusPageComponentDao; import org.apache.hertzbeat.manager.dao.StatusPageHistoryDao; diff --git a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/config/SchedulerConfig.java b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/config/SchedulerConfig.java index 1a8ad4221b5..6288b68783b 100644 --- a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/config/SchedulerConfig.java +++ b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/config/SchedulerConfig.java @@ -18,8 +18,8 @@ package org.apache.hertzbeat.manager.config; import lombok.extern.slf4j.Slf4j; -import org.apache.hertzbeat.manager.scheduler.ConsistentHash; -import org.apache.hertzbeat.manager.scheduler.SchedulerProperties; +import org.apache.hertzbeat.manager.scheduler.ConsistentHashCollectorKeeper; +import org.apache.hertzbeat.manager.properties.SchedulerProperties; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -33,8 +33,8 @@ public class SchedulerConfig { @Bean - public ConsistentHash consistentHasInstance() { - return new ConsistentHash(); + public ConsistentHashCollectorKeeper consistentHasInstance() { + return new ConsistentHashCollectorKeeper(); } } diff --git a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/nativex/HertzbeatRuntimeHintsRegistrar.java b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/nativex/HertzbeatRuntimeHintsRegistrar.java index 44d9b3a6a33..f219c1272db 100644 --- a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/nativex/HertzbeatRuntimeHintsRegistrar.java +++ b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/nativex/HertzbeatRuntimeHintsRegistrar.java @@ -19,7 +19,6 @@ package org.apache.hertzbeat.manager.nativex; -import java.lang.reflect.Constructor; import java.util.Set; import org.apache.sshd.common.channel.ChannelListener; import org.apache.sshd.common.forward.PortForwardingEventListener; @@ -28,7 +27,6 @@ import org.apache.sshd.common.session.SessionListener; import org.apache.sshd.common.util.security.bouncycastle.BouncyCastleSecurityProviderRegistrar; import org.apache.sshd.common.util.security.eddsa.EdDSASecurityProviderRegistrar; -import org.springframework.aot.hint.ExecutableMode; import org.springframework.aot.hint.MemberCategory; import org.springframework.aot.hint.RuntimeHints; import org.springframework.aot.hint.RuntimeHintsRegistrar; @@ -60,11 +58,4 @@ public void registerHints(@NonNull RuntimeHints hints, ClassLoader classLoader) TypeReference.of(PortForwardingEventListener.class), TypeReference.of(SessionListener.class)); } } - - private void registerConstructor(RuntimeHints hints, Class clazz) { - Constructor[] declaredConstructors = clazz.getDeclaredConstructors(); - for (Constructor declaredConstructor : declaredConstructors) { - hints.reflection().registerConstructor(declaredConstructor, ExecutableMode.INVOKE); - } - } } diff --git a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/pojo/CollectorNode.java b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/pojo/CollectorNode.java new file mode 100644 index 00000000000..2d711c69687 --- /dev/null +++ b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/pojo/CollectorNode.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.manager.pojo; + +import lombok.Data; +import org.apache.hertzbeat.common.constants.CollectorStatus; +import org.apache.hertzbeat.manager.scheduler.AssignJobs; + +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Collector Node + */ +@Data +public class CollectorNode { + /** + * Default number of VM nodes + */ + private static final byte VIRTUAL_NODE_DEFAULT_SIZE = 10; + + /** + * collector identity + */ + private final String identity; + + /** + * collector mode: public or private + */ + private String mode; + + /** + * ip + */ + private String ip; + + /** + * collector On-line time stamp + */ + private long uptime; + + /** + * collector's own performance service quality score 0 - 127 + * The number of virtual nodes will be calculated based on this service quality score + * + */ + private Byte quality; + + private CollectorStatus collectorStatus; + + /** + * use this collector's collect job ID list + * jobId,jobVersion + */ + private AssignJobs assignJobs; + + /** + * the collection task ID list mapped by each virtual node corresponding to this node + * Long[] [0]-jobId, [1]-dispatchHash + */ + private Map> virtualNodeMap; + + public CollectorNode(String identity, String mode, String ip, long uptime, Byte quality) { + this.identity = identity; + this.mode = mode; + this.ip = ip; + this.uptime = uptime; + this.quality = quality; + assignJobs = new AssignJobs(); + virtualNodeMap = new ConcurrentHashMap<>(VIRTUAL_NODE_DEFAULT_SIZE); + } + + public synchronized void addJob(Integer virtualNodeKey, Integer dispatchHash, Long jobId, boolean isFlushed) { + if (virtualNodeMap == null) { + virtualNodeMap = new ConcurrentHashMap<>(VIRTUAL_NODE_DEFAULT_SIZE); + } + if (assignJobs == null) { + assignJobs = new AssignJobs(); + } + Set virtualNodeJob = virtualNodeMap.computeIfAbsent(virtualNodeKey, k -> ConcurrentHashMap.newKeySet(16)); + virtualNodeJob.add(new Long[]{jobId, dispatchHash.longValue()}); + if (isFlushed) { + assignJobs.addAssignJob(jobId); + } else { + assignJobs.addAddingJob(jobId); + } + } + + /** + * obtain the collection task routed by the specified virtual node according to virtualNodeKey + * @param virtualNodeKey virtualNodeKey + * @return collection task + */ + public Set clearVirtualNodeJobs(Integer virtualNodeKey) { + if (virtualNodeMap == null || virtualNodeMap.isEmpty()) { + return null; + } + Set virtualNodeJobs = virtualNodeMap.remove(virtualNodeKey); + virtualNodeMap.put(virtualNodeKey, ConcurrentHashMap.newKeySet(16)); + return virtualNodeJobs; + } + + public void addVirtualNodeJobs(Integer virtualHashKey, Set reDispatchJobs) { + if (reDispatchJobs == null) { + return; + } + if (virtualNodeMap == null) { + virtualNodeMap = new ConcurrentHashMap<>(16); + } + virtualNodeMap.computeIfPresent(virtualHashKey, (k, v) -> { + reDispatchJobs.addAll(v); + return v; + }); + virtualNodeMap.put(virtualHashKey, reDispatchJobs); + } + + public void removeVirtualNodeJob(Long jobId) { + if (jobId == null || virtualNodeMap == null) { + return; + } + for (Set jobSet : virtualNodeMap.values()) { + Optional optional = jobSet.stream().filter(item -> Objects.equals(item[0], jobId)).findFirst(); + if (optional.isPresent()) { + jobSet.remove(optional.get()); + break; + } + } + } + + public void destroy() { + if (assignJobs != null) { + assignJobs.clear(); + } + if (virtualNodeMap != null) { + virtualNodeMap.clear(); + } + } +} diff --git a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/pojo/JobCache.java b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/pojo/JobCache.java new file mode 100644 index 00000000000..33ebb714323 --- /dev/null +++ b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/pojo/JobCache.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.manager.pojo; + +import org.apache.hertzbeat.common.entity.job.Job; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Utility class for caching {@link Job} objects in memory. + *

+ * This class provides static methods to store, retrieve, and remove {@code Job} instances + * using a thread-safe {@link ConcurrentHashMap}. It is intended to be used as a simple + * in-memory cache for job data within the manager component. + *

+ * Usage: + *

+ *     JobCache.put(job);
+ *     Job job = JobCache.get(jobId);
+ *     JobCache.remove(jobId);
+ * 
+ */ +public class JobCache { + private static final Map jobContentCache = new ConcurrentHashMap<>(16); + + public static Job get(Long jobId) { + return jobContentCache.get(jobId); + } + + public static void put(Job job) { + jobContentCache.put(job.getId(), job); + } + + public static void remove(Long jobId) { + jobContentCache.remove(jobId); + } +} diff --git a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/SchedulerProperties.java b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/properties/SchedulerProperties.java similarity index 97% rename from hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/SchedulerProperties.java rename to hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/properties/SchedulerProperties.java index 826a8049275..888f1e78597 100644 --- a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/SchedulerProperties.java +++ b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/properties/SchedulerProperties.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.hertzbeat.manager.scheduler; +package org.apache.hertzbeat.manager.properties; import lombok.Getter; import lombok.Setter; diff --git a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/config/StatusProperties.java b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/properties/StatusProperties.java similarity index 96% rename from hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/config/StatusProperties.java rename to hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/properties/StatusProperties.java index 7a2d9daa474..ef57172e6fc 100644 --- a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/config/StatusProperties.java +++ b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/properties/StatusProperties.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.hertzbeat.manager.config; +package org.apache.hertzbeat.manager.properties; import lombok.Getter; import lombok.Setter; diff --git a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/AssignJobs.java b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/AssignJobs.java index 90f795c72d6..22ae14a974a 100644 --- a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/AssignJobs.java +++ b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/AssignJobs.java @@ -29,6 +29,7 @@ @Slf4j @Data public class AssignJobs { + private static final Integer DEFAULT_CAPACITY = 16; /** * current assign jobIds @@ -51,10 +52,10 @@ public class AssignJobs { private Set pinnedJobs; public AssignJobs() { - jobs = ConcurrentHashMap.newKeySet(16); - addingJobs = ConcurrentHashMap.newKeySet(16); - removingJobs = ConcurrentHashMap.newKeySet(16); - pinnedJobs = ConcurrentHashMap.newKeySet(16); + jobs = ConcurrentHashMap.newKeySet(DEFAULT_CAPACITY); + addingJobs = ConcurrentHashMap.newKeySet(DEFAULT_CAPACITY); + removingJobs = ConcurrentHashMap.newKeySet(DEFAULT_CAPACITY); + pinnedJobs = ConcurrentHashMap.newKeySet(DEFAULT_CAPACITY); } public void addAssignJob(Long jobId) { diff --git a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java index c939200e762..491f1fe0d17 100644 --- a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java +++ b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java @@ -29,11 +29,14 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; + +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hertzbeat.collector.dispatch.entrance.internal.CollectJobService; import org.apache.hertzbeat.collector.dispatch.entrance.internal.CollectResponseEventListener; +import org.apache.hertzbeat.common.constants.CollectorStatus; import org.apache.hertzbeat.common.constants.CommonConstants; import org.apache.hertzbeat.common.entity.dto.CollectorInfo; import org.apache.hertzbeat.common.entity.dto.ServerInfo; @@ -53,6 +56,10 @@ import org.apache.hertzbeat.manager.dao.CollectorMonitorBindDao; import org.apache.hertzbeat.manager.dao.MonitorDao; import org.apache.hertzbeat.manager.dao.ParamDao; +import org.apache.hertzbeat.manager.pojo.CollectorNode; +import org.apache.hertzbeat.manager.pojo.JobCache; +import org.apache.hertzbeat.manager.properties.SchedulerProperties; +import org.apache.hertzbeat.manager.scheduler.collector.CollectorKeeper; import org.apache.hertzbeat.manager.scheduler.netty.ManageServer; import org.apache.hertzbeat.manager.service.AppService; import org.springframework.beans.factory.annotation.Autowired; @@ -65,9 +72,7 @@ @Component @AutoConfigureAfter(value = {SchedulerProperties.class}) @Slf4j -public class CollectorJobScheduler implements CollectorScheduling, CollectJobScheduling { - - private final Map jobContentCache = new ConcurrentHashMap<>(16); +public class CollectorJobScheduler implements CollectorOperation, CollectorOperationReceiver, JobOperation { private final Map eventListeners = new ConcurrentHashMap<>(16); @@ -77,9 +82,6 @@ public class CollectorJobScheduler implements CollectorScheduling, CollectJobSch @Autowired private CollectorMonitorBindDao collectorMonitorBindDao; - @Autowired - private ConsistentHash consistentHash; - @Autowired private CollectJobService collectJobService; @@ -92,6 +94,10 @@ public class CollectorJobScheduler implements CollectorScheduling, CollectJobSch @Autowired private ParamDao paramDao; + @Autowired + private CollectorKeeper collectorKeeper; + + @Setter private ManageServer manageServer; @Override @@ -125,15 +131,18 @@ public void collectorGoOnline(String identity, CollectorInfo collectorInfo) { .build(); } collectorDao.save(collector); - ConsistentHash.Node node = new ConsistentHash.Node(identity, collector.getMode(), - collector.getIp(), System.currentTimeMillis(), null); - consistentHash.addNode(node); - reBalanceCollectorAssignJobs(); + + CollectorNode node = new CollectorNode(identity, collector.getMode(), collector.getIp(), System.currentTimeMillis(), null); + collectorKeeper.addNode(node); + collectorKeeper.changeStatus(identity, CollectorStatus.ONLINE); + collectorKeeper.rebalanceJobs(this::doRebalanceJobs); + // Read database The fixed collection tasks at this collector are delivered List binds = collectorMonitorBindDao.findCollectorMonitorBindsByCollector(identity); if (CollectionUtils.isEmpty(binds)){ return; } + List monitors = monitorDao.findMonitorsByIdIn(binds.stream().map(CollectorMonitorBind::getMonitorId).collect(Collectors.toSet())); for (Monitor monitor : monitors) { if (Objects.isNull(monitor) || monitor.getStatus() == CommonConstants.MONITOR_PAUSED_CODE) { @@ -189,56 +198,10 @@ public void collectorGoOffline(String identity) { } collector.setStatus(CommonConstants.COLLECTOR_STATUS_OFFLINE); collectorDao.save(collector); - consistentHash.removeNode(identity); - reBalanceCollectorAssignJobs(); - log.info("the collector: {} go offline success.", identity); - } - @Override - public void reBalanceCollectorAssignJobs() { - consistentHash.getAllNodes().entrySet().parallelStream().forEach(entry -> { - String collectorName = entry.getKey(); - AssignJobs assignJobs = entry.getValue().getAssignJobs(); - if (StringUtils.isBlank(collectorName) || Objects.isNull(assignJobs)) { - return; - } - if (CollectionUtils.isNotEmpty(assignJobs.getAddingJobs())) { - Set addedJobIds = new HashSet<>(8); - for (Long addingJobId : assignJobs.getAddingJobs()) { - Job job = jobContentCache.get(addingJobId); - if (Objects.isNull(job)) { - log.error("assigning job {} content is null.", addingJobId); - continue; - } - addedJobIds.add(addingJobId); - if (CommonConstants.MAIN_COLLECTOR_NODE.equals(collectorName)) { - collectJobService.addAsyncCollectJob(job); - } else { - ClusterMsg.Message message = ClusterMsg.Message.newBuilder() - .setDirection(ClusterMsg.Direction.REQUEST) - .setType(ClusterMsg.MessageType.ISSUE_CYCLIC_TASK) - .setMsg(ByteString.copyFromUtf8(JsonUtil.toJson(job))) - .build(); - this.manageServer.sendMsg(collectorName, message); - } - } - assignJobs.addAssignJobs(addedJobIds); - assignJobs.removeAddingJobs(addedJobIds); - } - if (CollectionUtils.isNotEmpty(assignJobs.getRemovingJobs())) { - if (CommonConstants.MAIN_COLLECTOR_NODE.equals(collectorName)) { - assignJobs.getRemovingJobs().forEach(jobId -> collectJobService.cancelAsyncCollectJob(jobId)); - } else { - ClusterMsg.Message message = ClusterMsg.Message.newBuilder() - .setDirection(ClusterMsg.Direction.REQUEST) - .setType(ClusterMsg.MessageType.DELETE_CYCLIC_TASK) - .setMsg(ByteString.copyFromUtf8(JsonUtil.toJson(assignJobs.getRemovingJobs()))) - .build(); - this.manageServer.sendMsg(collectorName, message); - } - assignJobs.clearRemovingJobs(); - } - }); + collectorKeeper.changeStatus(identity, CollectorStatus.OFFLINE); + collectorKeeper.rebalanceJobs(this::doRebalanceJobs); + log.info("the collector: {} go offline success.", identity); } @Override @@ -285,56 +248,11 @@ public boolean onlineCollector(String identity) { return true; } - @Override - public List collectSyncJobData(Job job) { - // todo dispatchKey ip+port or id - String dispatchKey = String.valueOf(job.getMonitorId()); - ConsistentHash.Node node = consistentHash.preDispatchJob(dispatchKey); - if (Objects.isNull(node)) { - log.error("there is no collector online to assign job."); - CollectRep.MetricsData metricsData = CollectRep.MetricsData.newBuilder() - .setCode(CollectRep.Code.FAIL) - .setMsg("no collector online to assign job") - .build(); - return Collections.singletonList(metricsData); - } - if (CommonConstants.MAIN_COLLECTOR_NODE.equals(node.getIdentity())) { - return collectJobService.collectSyncJobData(job); - } else { - List metricsData = new LinkedList<>(); - CountDownLatch countDownLatch = new CountDownLatch(1); - - ClusterMsg.Message message = ClusterMsg.Message.newBuilder() - .setType(ClusterMsg.MessageType.ISSUE_ONE_TIME_TASK) - .setDirection(ClusterMsg.Direction.REQUEST) - .setMsg(ByteString.copyFromUtf8(JsonUtil.toJson(job))) - .build(); - boolean result = this.manageServer.sendMsg(node.getIdentity(), message); - - if (result) { - CollectResponseEventListener listener = new CollectResponseEventListener() { - @Override - public void response(List responseMetrics) { - if (responseMetrics != null) { - metricsData.addAll(responseMetrics); - } - countDownLatch.countDown(); - } - }; - eventListeners.put(job.getMonitorId(), listener); - } - try { - countDownLatch.await(120, TimeUnit.SECONDS); - } catch (Exception e) { - log.info("The sync task runs for 120 seconds with no response and returns"); - } - return metricsData; - } - } - @Override public List collectSyncJobData(Job job, String collector) { - ConsistentHash.Node node = consistentHash.getNode(collector); + CollectorNode node = StringUtils.isBlank(collector) + ? collectorKeeper.determineNode(job.getMonitorId()) + : collectorKeeper.getNode(collector); if (Objects.isNull(node)) { log.error("there is no collector online to assign job."); CollectRep.MetricsData metricsData = CollectRep.MetricsData.newBuilder() @@ -343,9 +261,11 @@ public List collectSyncJobData(Job job, String collector .build(); return Collections.singletonList(metricsData); } + if (CommonConstants.MAIN_COLLECTOR_NODE.equals(node.getIdentity())) { return collectJobService.collectSyncJobData(job); } + List metricsData = new LinkedList<>(); ClusterMsg.Message message = ClusterMsg.Message.newBuilder() .setType(ClusterMsg.MessageType.ISSUE_ONE_TIME_TASK) @@ -378,25 +298,10 @@ public void response(List responseMetrics) { public long addAsyncCollectJob(Job job, String collector) { long jobId = SnowFlakeIdGenerator.generateId(); job.setId(jobId); - jobContentCache.put(jobId, job); - ConsistentHash.Node node; - if (StringUtils.isBlank(collector)) { - // todo dispatchKey ip+port or id - String dispatchKey = String.valueOf(job.getMonitorId()); - node = consistentHash.dispatchJob(dispatchKey, jobId); - if (node == null) { - log.error("there is no collector online to assign job."); - return jobId; - } - } else { - node = consistentHash.getNode(collector); - if (node == null) { - log.error("there is no collector name: {} online to assign job.", collector); - return jobId; - } - node.getAssignJobs().addPinnedJob(jobId); - } - if (CommonConstants.MAIN_COLLECTOR_NODE.equals(node.getIdentity())) { + + CollectorNode collectorNode = collectorKeeper.addJob(job, collector); + + if (CommonConstants.MAIN_COLLECTOR_NODE.equals(collectorNode.getIdentity())) { collectJobService.addAsyncCollectJob(job); } else { ClusterMsg.Message message = ClusterMsg.Message.newBuilder() @@ -404,27 +309,16 @@ public long addAsyncCollectJob(Job job, String collector) { .setDirection(ClusterMsg.Direction.REQUEST) .setMsg(ByteString.copyFromUtf8(JsonUtil.toJson(job))) .build(); - this.manageServer.sendMsg(node.getIdentity(), message); + this.manageServer.sendMsg(collectorNode.getIdentity(), message); } return jobId; } - @Override - public long updateAsyncCollectJob(Job modifyJob) { - // delete and add - long preJobId = modifyJob.getId(); - long newJobId = addAsyncCollectJob(modifyJob, null); - jobContentCache.remove(preJobId); - cancelAsyncCollectJob(preJobId); - return newJobId; - } - @Override public long updateAsyncCollectJob(Job modifyJob, String collector) { // delete and add long preJobId = modifyJob.getId(); long newJobId = addAsyncCollectJob(modifyJob, collector); - jobContentCache.remove(preJobId); cancelAsyncCollectJob(preJobId); return newJobId; } @@ -434,24 +328,21 @@ public void cancelAsyncCollectJob(Long jobId) { if (jobId == null) { return; } - jobContentCache.remove(jobId); - for (ConsistentHash.Node node : consistentHash.getAllNodes().values()) { - AssignJobs assignJobs = node.getAssignJobs(); - if (assignJobs.getPinnedJobs().remove(jobId) - || assignJobs.getJobs().remove(jobId) || assignJobs.getAddingJobs().remove(jobId)) { - node.removeVirtualNodeJob(jobId); - if (CommonConstants.MAIN_COLLECTOR_NODE.equals(node.getIdentity())) { - collectJobService.cancelAsyncCollectJob(jobId); - } else { - ClusterMsg.Message deleteMessage = ClusterMsg.Message.newBuilder() - .setType(ClusterMsg.MessageType.DELETE_CYCLIC_TASK) - .setDirection(ClusterMsg.Direction.REQUEST) - .setMsg(ByteString.copyFromUtf8(JsonUtil.toJson(List.of(jobId)))) - .build(); - this.manageServer.sendMsg(node.getIdentity(), deleteMessage); - } - // break; if is there jod exist in multi collector? - } + + CollectorNode collectorNode = collectorKeeper.removeJob(jobId); + if (collectorNode == null) { + return; + } + + if (CommonConstants.MAIN_COLLECTOR_NODE.equals(collectorNode.getIdentity())) { + collectJobService.cancelAsyncCollectJob(jobId); + } else { + ClusterMsg.Message deleteMessage = ClusterMsg.Message.newBuilder() + .setType(ClusterMsg.MessageType.DELETE_CYCLIC_TASK) + .setDirection(ClusterMsg.Direction.REQUEST) + .setMsg(ByteString.copyFromUtf8(JsonUtil.toJson(List.of(jobId)))) + .build(); + this.manageServer.sendMsg(collectorNode.getIdentity(), deleteMessage); } } @@ -468,7 +359,55 @@ public void collectSyncJobResponse(List metricsDataList) } } - public void setManageServer(ManageServer manageServer) { - this.manageServer = manageServer; + private void doRebalanceJobs(AssignJobs assignJobs, String collectorName) { + handleAddingJobs(assignJobs, collectorName); + + handleRemovingJobs(assignJobs, collectorName); + } + + private void handleAddingJobs(AssignJobs assignJobs, String collectorName) { + if (CollectionUtils.isEmpty(assignJobs.getAddingJobs())) { + return; + } + + Set addedJobIds = new HashSet<>(8); + for (Long addingJobId : assignJobs.getAddingJobs()) { + Job job = JobCache.get(addingJobId); + if (Objects.isNull(job)) { + log.error("assigning job {} content is null.", addingJobId); + continue; + } + addedJobIds.add(addingJobId); + if (CommonConstants.MAIN_COLLECTOR_NODE.equals(collectorName)) { + collectJobService.addAsyncCollectJob(job); + } else { + ClusterMsg.Message message = ClusterMsg.Message.newBuilder() + .setDirection(ClusterMsg.Direction.REQUEST) + .setType(ClusterMsg.MessageType.ISSUE_CYCLIC_TASK) + .setMsg(ByteString.copyFromUtf8(JsonUtil.toJson(job))) + .build(); + this.manageServer.sendMsg(collectorName, message); + } + } + assignJobs.addAssignJobs(addedJobIds); + assignJobs.removeAddingJobs(addedJobIds); + } + + private void handleRemovingJobs(AssignJobs assignJobs, String collectorName) { + if (CollectionUtils.isEmpty(assignJobs.getRemovingJobs())) { + return; + } + + if (CommonConstants.MAIN_COLLECTOR_NODE.equals(collectorName)) { + assignJobs.getRemovingJobs().forEach(jobId -> collectJobService.cancelAsyncCollectJob(jobId)); + } else { + ClusterMsg.Message message = ClusterMsg.Message.newBuilder() + .setDirection(ClusterMsg.Direction.REQUEST) + .setType(ClusterMsg.MessageType.DELETE_CYCLIC_TASK) + .setMsg(ByteString.copyFromUtf8(JsonUtil.toJson(assignJobs.getRemovingJobs()))) + .build(); + this.manageServer.sendMsg(collectorName, message); + } + assignJobs.clearRemovingJobs(); } } diff --git a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorOperation.java b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorOperation.java new file mode 100644 index 00000000000..24e21f05a14 --- /dev/null +++ b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorOperation.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.manager.scheduler; + +/** + * Interface defining operations for managing collector. + * Implementations of this interface provide functionality to control the operation state + * of collectors in the system. + */ +public interface CollectorOperation { + + /** + * Takes a collector offline by stopping its collection operations. + * This is typically used for maintenance, updates, or when the collector is no longer needed. + * + * @param identity The unique identifier of the collector to be taken offline + * @return true if the collector was successfully taken offline, + * false if the operation failed or the collector wasn't found + */ + boolean offlineCollector(String identity); + + /** + * Brings a collector online by starting its collection operations. + * This is used to activate a collector that was previously offline. + * + * @param identity The unique identifier of the collector to be brought online + * @return true if the collector was successfully brought online, + * false if the operation failed or the collector wasn't found + */ + boolean onlineCollector(String identity); +} diff --git a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorScheduling.java b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorOperationReceiver.java similarity index 54% rename from hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorScheduling.java rename to hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorOperationReceiver.java index 76ad110c81a..d6b64743aa0 100644 --- a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorScheduling.java +++ b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorOperationReceiver.java @@ -20,39 +20,25 @@ import org.apache.hertzbeat.common.entity.dto.CollectorInfo; /** - * slave collector service + * Interface defining operations for receiving collector status updates from remote collectors. + * This interface serves as a callback mechanism for handling collector online/offline events. */ -public interface CollectorScheduling { - +public interface CollectorOperationReceiver { /** - * register collector go online - * @param identity collector identity name - * @param collectorInfo collector information + * Notifies the system when a collector comes online. + * This method should be called when a collector establishes connection and becomes available. + * + * @param identity The unique identifier of the collector (e.g., hostname, IP, or custom ID) + * @param collectorInfo Detailed information about the collector including capabilities, + * configuration, and status metadata */ void collectorGoOnline(String identity, CollectorInfo collectorInfo); - - /** - * register collector go offline - * @param identity collector identity name - */ - void collectorGoOffline(String identity); - - /** - * reBalance dispatch monitoring jobs when collector go online or offline or timeout - */ - void reBalanceCollectorAssignJobs(); - - /** - * offline collector(stop collector collect operation) - * @param identity collector identity name - * @return true/false - */ - boolean offlineCollector(String identity); /** - * online collector(start collector collect operation) - * @param identity collector identity name - * @return true/false + * Notifies the system when a collector goes offline. + * This method should be called when a collector disconnects or becomes unavailable. + * + * @param identity The unique identifier of the collector to be marked as offline */ - boolean onlineCollector(String identity); + void collectorGoOffline(String identity); } diff --git a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/ConsistentHash.java b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/ConsistentHashCollectorKeeper.java similarity index 51% rename from hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/ConsistentHash.java rename to hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/ConsistentHashCollectorKeeper.java index 44d1707aaff..fb85cb65e16 100644 --- a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/ConsistentHash.java +++ b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/ConsistentHashCollectorKeeper.java @@ -23,46 +23,149 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiConsumer; import java.util.stream.Collectors; + import lombok.AllArgsConstructor; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.hertzbeat.common.constants.CollectorStatus; import org.apache.hertzbeat.common.constants.CommonConstants; +import org.apache.hertzbeat.common.entity.job.Job; +import org.apache.hertzbeat.manager.pojo.CollectorNode; +import org.apache.hertzbeat.manager.pojo.JobCache; +import org.apache.hertzbeat.manager.scheduler.collector.CollectorKeeper; /** * Collector and task mapping scheduling implemented by consistent hashing */ @Slf4j -public class ConsistentHash { +public class ConsistentHashCollectorKeeper implements CollectorKeeper { /** * consistent hash circle */ - private final ConcurrentTreeMap hashCircle; + private final ConcurrentTreeMap hashCircle = new ConcurrentTreeMap<>(); /** * collector node */ - private final Map existNodeMap; + private final Map existNodeMap = new ConcurrentHashMap<>(16); /** - * not dispatched job cache + * not dispatched job cache, in order to obtain the cached collection scheduling task */ - private final List dispatchJobCache; + @Getter + private final List dispatchJobCache = Collections.synchronizedList(new LinkedList<>()); /** * Default number of VM nodes */ private static final byte VIRTUAL_NODE_DEFAULT_SIZE = 10; - public ConsistentHash() { - hashCircle = new ConcurrentTreeMap<>(); - existNodeMap = new ConcurrentHashMap<>(16); - dispatchJobCache = Collections.synchronizedList(new LinkedList<>()); + /** + * add collector node + * @param newNode node + */ + @Override + public void addNode(CollectorNode newNode) { + // when mode is cluster public, need reBalance dispatch jobs. else not when is cloud-edge private + if (!CommonConstants.MODE_PRIVATE.equals(newNode.getMode())) { + byte virtualNodeNum = newNode.getQuality() == null ? VIRTUAL_NODE_DEFAULT_SIZE : newNode.getQuality(); + for (byte i = 0; i < virtualNodeNum; i++) { + addVirtualNode(newNode, newNode.getIdentity() + i); + } + } + existNodeMap.put(newNode.getIdentity(), newNode); + dispatchJobInCache(); + } + + @Override + public CollectorNode addJob(Job job, String collectorId) { + JobCache.put(job); + CollectorNode collectorNode; + + if (StringUtils.isBlank(collectorId)) { + // todo dispatchKey ip+port or id + String dispatchKey = String.valueOf(job.getMonitorId()); + collectorNode = this.dispatchJob(dispatchKey, job.getId()); + if (collectorNode == null) { + log.error("there is no collector online to assign job."); + } + } else { + collectorNode = getNode(collectorId); + if (collectorNode == null) { + log.error("there is no collector name: {} online to assign job.", collectorId); + return null; + } + collectorNode.getAssignJobs().addPinnedJob(job.getId()); + } + + return collectorNode; + } + + /** + * get node + * @param collectorName collector name + * @return node + */ + @Override + public CollectorNode getNode(String collectorName) { + return existNodeMap.get(collectorName); + } + + @Override + public CollectorNode determineNode(Long jobId) { + String dispatchKey = String.valueOf(jobId); + if (dispatchKey == null || StringUtils.isBlank(dispatchKey)) { + log.error("The dispatch key can not null."); + return null; + } + int dispatchHash = hash(dispatchKey); + return preDispatchJob(dispatchHash); + } + + @Override + public void changeStatus(String collectorId, CollectorStatus collectorStatus) { + switch (collectorStatus) { + case ONLINE -> this.getNode(collectorId).setCollectorStatus(collectorStatus); + case OFFLINE -> this.removeNode(collectorId); + default -> {} + } + } + + @Override + public void rebalanceJobs(BiConsumer assignJobCollectorConsumer) { + existNodeMap.entrySet().parallelStream().forEach(entry -> { + String collectorName = entry.getKey(); + AssignJobs assignJobs = entry.getValue().getAssignJobs(); + if (StringUtils.isBlank(collectorName) || Objects.isNull(assignJobs)) { + return; + } + + assignJobCollectorConsumer.accept(assignJobs, collectorName); + }); + } + + @Override + public CollectorNode removeJob(Long jobId) { + JobCache.remove(jobId); + + for (CollectorNode node : existNodeMap.values()) { + AssignJobs assignJobs = node.getAssignJobs(); + if (assignJobs.getPinnedJobs().remove(jobId) + || assignJobs.getJobs().remove(jobId) || assignJobs.getAddingJobs().remove(jobId)) { + node.removeVirtualNodeJob(jobId); + + return node; + // break; if is there jod exist in multi collector? + } + } + + return null; } /** @@ -70,15 +173,15 @@ public ConsistentHash() { * @param newNode node * @param identity virtual node identity */ - public synchronized void addVirtualNode(Node newNode, String identity){ + private synchronized void addVirtualNode(CollectorNode newNode, String identity){ int virtualHashKey = hash(identity); hashCircle.put(virtualHashKey, newNode); newNode.addVirtualNodeJobs(virtualHashKey, ConcurrentHashMap.newKeySet(16)); - Map.Entry higherVirtualNode = hashCircle.higherOrFirstEntry(virtualHashKey); + Map.Entry higherVirtualNode = hashCircle.higherOrFirstEntry(virtualHashKey); // Reassign tasks that are routed to the higherVirtualNode virtual node // Tasks are either on the original virtual node or on the new virtual node Integer higherVirtualNodeKey = higherVirtualNode.getKey(); - Node higherNode = higherVirtualNode.getValue(); + CollectorNode higherNode = higherVirtualNode.getValue(); Set dispatchJobs = higherNode.clearVirtualNodeJobs(higherVirtualNodeKey); if (dispatchJobs != null && !dispatchJobs.isEmpty()) { Set reDispatchJobs = ConcurrentHashMap.newKeySet(dispatchJobs.size()); @@ -91,31 +194,15 @@ public synchronized void addVirtualNode(Node newNode, String identity){ iterator.remove(); } } - higherNode.virtualNodeMap.put(higherVirtualNodeKey, dispatchJobs); + higherNode.getVirtualNodeMap().put(higherVirtualNodeKey, dispatchJobs); Set jobIds = reDispatchJobs.stream().map(item -> item[0]).collect(Collectors.toSet()); newNode.addVirtualNodeJobs(virtualHashKey, reDispatchJobs); if (higherNode != newNode) { - higherNode.assignJobs.removeAssignJobs(jobIds); - higherNode.assignJobs.addRemovingJobs(jobIds); - newNode.assignJobs.addAddingJobs(jobIds); - } - } - } - - /** - * add collector node - * @param newNode node - */ - public void addNode(Node newNode) { - // when mode is cluster public, need reBalance dispatch jobs. else not when is cloud-edge private - if (!CommonConstants.MODE_PRIVATE.equals(newNode.mode)) { - byte virtualNodeNum = newNode.quality == null ? VIRTUAL_NODE_DEFAULT_SIZE : newNode.quality; - for (byte i = 0; i < virtualNodeNum; i++) { - addVirtualNode(newNode, newNode.identity + i); + higherNode.getAssignJobs().removeAssignJobs(jobIds); + higherNode.getAssignJobs().addRemovingJobs(jobIds); + newNode.getAssignJobs().addAddingJobs(jobIds); } } - existNodeMap.put(newNode.identity, newNode); - dispatchJobInCache(); } /** @@ -123,21 +210,21 @@ public void addNode(Node newNode) { * @param deletedNode node * @param virtualNodeHash virtual node hash key */ - public synchronized void removeVirtualNode(Node deletedNode, Integer virtualNodeHash) { - Set removeJobHashSet = deletedNode.virtualNodeMap.get(virtualNodeHash); + private synchronized void removeVirtualNode(CollectorNode deletedNode, Integer virtualNodeHash) { + Set removeJobHashSet = deletedNode.getVirtualNodeMap().get(virtualNodeHash); // Migrate the virtualNodeEntry collection task to the nearest virtual node that is larger than it hashCircle.remove(virtualNodeHash); if (removeJobHashSet == null || removeJobHashSet.isEmpty()) { return; } - Map.Entry higherVirtualEntry = hashCircle.higherOrFirstEntry(virtualNodeHash); + Map.Entry higherVirtualEntry = hashCircle.higherOrFirstEntry(virtualNodeHash); if (higherVirtualEntry == null || higherVirtualEntry.getValue() == deletedNode) { higherVirtualEntry = null; } // jobId Set removeJobIds = removeJobHashSet.stream().map(item -> item[0]).collect(Collectors.toSet()); - deletedNode.assignJobs.removeAssignJobs(removeJobIds); - deletedNode.assignJobs.addRemovingJobs(removeJobIds); + deletedNode.getAssignJobs().removeAssignJobs(removeJobIds); + deletedNode.getAssignJobs().addRemovingJobs(removeJobIds); if (higherVirtualEntry == null) { // jobId-dispatchHash removeJobHashSet.forEach(value -> { @@ -150,9 +237,9 @@ public synchronized void removeVirtualNode(Node deletedNode, Integer virtualNode } }); } else { - Node higherVirtualNode = higherVirtualEntry.getValue(); + CollectorNode higherVirtualNode = higherVirtualEntry.getValue(); higherVirtualNode.addVirtualNodeJobs(higherVirtualEntry.getKey(), removeJobHashSet); - higherVirtualNode.assignJobs.addAddingJobs(removeJobIds); + higherVirtualNode.getAssignJobs().addAddingJobs(removeJobIds); } } @@ -160,20 +247,19 @@ public synchronized void removeVirtualNode(Node deletedNode, Integer virtualNode * deleted collector node * @param name collector name */ - public Node removeNode(String name) { - Node deletedNode = existNodeMap.remove(name); + private void removeNode(String name) { + CollectorNode deletedNode = existNodeMap.remove(name); if (deletedNode == null) { - return null; - } - for (Integer virtualNodeHash : deletedNode.virtualNodeMap.keySet()) { - removeVirtualNode(deletedNode, virtualNodeHash); + return; } + + deletedNode.getVirtualNodeMap().keySet() + .forEach(virtualNodeHash -> removeVirtualNode(deletedNode, virtualNodeHash)); deletedNode.destroy(); dispatchJobInCache(); - return deletedNode; } - public synchronized void dispatchJobInCache() { + private synchronized void dispatchJobInCache() { if (!dispatchJobCache.isEmpty()) { int size = dispatchJobCache.size(); for (int index = 0; index < size; index++) { @@ -183,31 +269,6 @@ public synchronized void dispatchJobInCache() { } } - /** - * get all collector nodes - * @return nodes - */ - public Map getAllNodes() { - return existNodeMap; - } - - /** - * get node - * @param collectorName collector name - * @return node - */ - public Node getNode(String collectorName) { - return existNodeMap.get(collectorName); - } - - /** - * Obtain the cached collection scheduling task - * @return cache task - */ - public List getDispatchJobCache() { - return dispatchJobCache; - } - /** * obtain the collector node according to the collection task information * @@ -215,7 +276,7 @@ public List getDispatchJobCache() { * @param jobId jobId * @return collector node */ - public Node dispatchJob(String dispatchKey, Long jobId) { + private CollectorNode dispatchJob(String dispatchKey, Long jobId) { if (dispatchKey == null || StringUtils.isBlank(dispatchKey)) { log.error("The dispatch key can not null."); return null; @@ -224,38 +285,23 @@ public Node dispatchJob(String dispatchKey, Long jobId) { return dispatchJob(dispatchHash, jobId, true); } - /** - * The collector node to which the collector is assigned is obtained in advance based on the collection task information - * - * @param dispatchKey collector task route key: ip+appId - * @return collector node - */ - public Node preDispatchJob(String dispatchKey) { - if (dispatchKey == null || StringUtils.isBlank(dispatchKey)) { - log.error("The dispatch key can not null."); - return null; - } - int dispatchHash = hash(dispatchKey); - return preDispatchJob(dispatchHash); - } - /** * Obtain the collector node to which the collector is assigned based on the collection task information * * @param dispatchHash The task route hash is collected * @param jobId jobId - * @param isFlushed is has flush this job or wait to dispatch + * @param isFlushed if it has flushed this job or wait to dispatch * @return collector node */ - public Node dispatchJob(Integer dispatchHash, Long jobId, boolean isFlushed) { + private CollectorNode dispatchJob(Integer dispatchHash, Long jobId, boolean isFlushed) { if (dispatchHash == null || hashCircle == null || hashCircle.isEmpty()) { log.warn("There is no available collector registered. Cache the job {}.", jobId); dispatchJobCache.add(new DispatchJob(dispatchHash, jobId)); return null; } - Map.Entry ceilEntry = hashCircle.ceilingOrFirstEntry(dispatchHash); + Map.Entry ceilEntry = hashCircle.ceilingOrFirstEntry(dispatchHash); int virtualKey = ceilEntry.getKey(); - Node curNode = ceilEntry.getValue(); + CollectorNode curNode = ceilEntry.getValue(); curNode.addJob(virtualKey, dispatchHash, jobId, isFlushed); return curNode; @@ -267,25 +313,15 @@ public Node dispatchJob(Integer dispatchHash, Long jobId, boolean isFlushed) { * @param dispatchHash The task route hash is collected * @return collector node */ - public Node preDispatchJob(Integer dispatchHash) { + private CollectorNode preDispatchJob(Integer dispatchHash) { if (dispatchHash == null || hashCircle == null || hashCircle.isEmpty()) { log.warn("There is no available collector registered."); return null; } - Map.Entry ceilEntry = hashCircle.ceilingOrFirstEntry(dispatchHash); + Map.Entry ceilEntry = hashCircle.ceilingOrFirstEntry(dispatchHash); return ceilEntry.getValue(); } - /** - * hash long - * @param key long value - * @return hash value - */ - private int hash(long key) { - String keyStr = String.valueOf(key); - return hash(keyStr); - } - /** * FNV1_32_HASH algorithm * @param key the key @@ -314,7 +350,7 @@ private int hash(String key) { * dispatch job summary */ @AllArgsConstructor - public static class DispatchJob { + private static class DispatchJob { /** * dispatch task route key @@ -327,130 +363,4 @@ public static class DispatchJob { @Getter private Long jobId; } - - /** - * collector node machine address - */ - public static class Node { - - /** - * collector identity - */ - @Getter - private final String identity; - - /** - * collector mode: public or private - */ - private final String mode; - - /** - * ip - */ - private final String ip; - - /** - * collector On-line time stamp - */ - private final long uptime; - - /** - * collector's own performance service quality score 0 - 127 - * The number of virtual nodes will be calculated based on this service quality score - * - */ - private final Byte quality; - - /** - * use this collector's collect job ID list - * jobId,jobVersion - */ - private AssignJobs assignJobs; - - /** - * the collection task ID list mapped by each virtual node corresponding to this node - * Long[] [0]-jobId, [1]-dispatchHash - */ - private Map> virtualNodeMap; - - public Node(String identity, String mode, String ip, long uptime, Byte quality) { - this.identity = identity; - this.mode = mode; - this.ip = ip; - this.uptime = uptime; - this.quality = quality; - assignJobs = new AssignJobs(); - virtualNodeMap = new ConcurrentHashMap<>(VIRTUAL_NODE_DEFAULT_SIZE); - } - - private synchronized void addJob(Integer virtualNodeKey, Integer dispatchHash, Long jobId, boolean isFlushed) { - if (virtualNodeMap == null) { - virtualNodeMap = new ConcurrentHashMap<>(VIRTUAL_NODE_DEFAULT_SIZE); - } - if (assignJobs == null) { - assignJobs = new AssignJobs(); - } - Set virtualNodeJob = virtualNodeMap.computeIfAbsent(virtualNodeKey, k -> ConcurrentHashMap.newKeySet(16)); - virtualNodeJob.add(new Long[]{jobId, dispatchHash.longValue()}); - if (isFlushed) { - assignJobs.addAssignJob(jobId); - } else { - assignJobs.addAddingJob(jobId); - } - } - - /** - * obtain the collection task routed by the specified virtual node according to virtualNodeKey - * @param virtualNodeKey virtualNodeKey - * @return collection task - */ - private Set clearVirtualNodeJobs(Integer virtualNodeKey) { - if (virtualNodeMap == null || virtualNodeMap.isEmpty()) { - return null; - } - Set virtualNodeJobs = virtualNodeMap.remove(virtualNodeKey); - virtualNodeMap.put(virtualNodeKey, ConcurrentHashMap.newKeySet(16)); - return virtualNodeJobs; - } - - private void addVirtualNodeJobs(Integer virtualHashKey, Set reDispatchJobs) { - if (reDispatchJobs == null) { - return; - } - if (virtualNodeMap == null) { - virtualNodeMap = new ConcurrentHashMap<>(16); - } - virtualNodeMap.computeIfPresent(virtualHashKey, (k, v) -> { - reDispatchJobs.addAll(v); - return v; - }); - virtualNodeMap.put(virtualHashKey, reDispatchJobs); - } - - public void removeVirtualNodeJob(Long jobId) { - if (jobId == null || virtualNodeMap == null) { - return; - } - for (Set jobSet : virtualNodeMap.values()) { - Optional optional = jobSet.stream().filter(item -> Objects.equals(item[0], jobId)).findFirst(); - if (optional.isPresent()) { - jobSet.remove(optional.get()); - break; - } - } - } - - public AssignJobs getAssignJobs() { - return assignJobs; - } - - public void destroy() { - if (assignJobs != null) { - assignJobs.clear(); - } - if (virtualNodeMap != null) { - virtualNodeMap.clear(); - } - } - } } diff --git a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectJobScheduling.java b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/JobOperation.java similarity index 81% rename from hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectJobScheduling.java rename to hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/JobOperation.java index 55137e207fe..3f0071824c3 100644 --- a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectJobScheduling.java +++ b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/JobOperation.java @@ -24,15 +24,8 @@ /** * Collection job management provides api interface */ -public interface CollectJobScheduling { +public interface JobOperation { - /** - * Execute a one-time collection task and get the collected data response - * @param job Collect task details - * @return Collection results - */ - List collectSyncJobData(Job job); - /** * Execute a one-time collection task and get the collected data response * @param job Collect task details @@ -49,13 +42,6 @@ public interface CollectJobScheduling { */ long addAsyncCollectJob(Job job, String collector); - /** - * Update the periodic asynchronous collection tasks that have been delivered - * @param modifyJob Collect task details - * @return long Job ID - */ - long updateAsyncCollectJob(Job modifyJob); - /** * Update the periodic asynchronous collection tasks that have been delivered * @param modifyJob Collect task details diff --git a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/SchedulerInit.java b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/SchedulerInit.java index 868ac259b61..b376922a9f5 100644 --- a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/SchedulerInit.java +++ b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/SchedulerInit.java @@ -52,10 +52,10 @@ public class SchedulerInit implements CommandLineRunner { @Autowired - private CollectorScheduling collectorScheduling; + private CollectorOperationReceiver collectorOperationReceiver; @Autowired - private CollectJobScheduling collectJobScheduling; + private JobOperation jobOperation; private static final String MAIN_COLLECTOR_NODE_IP = "127.0.0.1"; private static final String DEFAULT_COLLECTOR_VERSION = "DEBUG"; @@ -91,7 +91,7 @@ public void run(String... args) throws Exception { .ip(MAIN_COLLECTOR_NODE_IP) .version(DEFAULT_COLLECTOR_VERSION) .build(); - collectorScheduling.collectorGoOnline(CommonConstants.MAIN_COLLECTOR_NODE, collectorInfo); + collectorOperationReceiver.collectorGoOnline(CommonConstants.MAIN_COLLECTOR_NODE, collectorInfo); // init jobs List monitors = monitorDao.findMonitorsByStatusNotInAndJobIdNotNull(List.of(CommonConstants.MONITOR_PAUSED_CODE)); List monitorBinds = collectorMonitorBindDao.findAll(); @@ -136,7 +136,7 @@ public void run(String... args) throws Exception { }); appDefine.setConfigmap(configmaps); String collector = monitorIdCollectorMap.get(monitor.getId()); - long jobId = collectJobScheduling.addAsyncCollectJob(appDefine, collector); + long jobId = jobOperation.addAsyncCollectJob(appDefine, collector); monitor.setJobId(jobId); monitorDao.save(monitor); } catch (Exception e) { diff --git a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/collector/CollectorKeeper.java b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/collector/CollectorKeeper.java new file mode 100644 index 00000000000..f19ccc73b33 --- /dev/null +++ b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/collector/CollectorKeeper.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.manager.scheduler.collector; + +import org.apache.hertzbeat.common.constants.CollectorStatus; +import org.apache.hertzbeat.common.entity.job.Job; +import org.apache.hertzbeat.manager.pojo.CollectorNode; +import org.apache.hertzbeat.manager.scheduler.AssignJobs; + +import java.util.function.BiConsumer; + +/** + * Interface for managing collector nodes and their associated jobs. + * Maintains all collector information and provides operations for managing collectors and job assignments. + */ +public interface CollectorKeeper { + + /** + * Adds a new collector node to the keeper's management pool. + * @param newNode The collector node to be added to the management system + */ + void addNode(CollectorNode newNode); + + /** + * Assigns a monitoring job to a specific collector node. + * @param job The monitoring job to be assigned + * @param collectorId The unique identifier of the target collector node + * @return The collector node that received the job assignment + */ + CollectorNode addJob(Job job, String collectorId); + + /** + * Retrieves a collector node by its unique identifier. + * @param collectorId The unique identifier of the collector node + * @return The collector node matching the given ID, or null if not found + */ + CollectorNode getNode(String collectorId); + + /** + * Determines the most appropriate collector node for a given job based on scheduling logic. + * @param jobId The unique identifier of the job to be assigned + * @return The collector node selected to handle this job + */ + CollectorNode determineNode(Long jobId); + + /** + * Updates the operational status of a collector node. + * @param collectorId The unique identifier of the collector node + * @param collectorStatus The new status to assign to the collector + */ + void changeStatus(String collectorId, CollectorStatus collectorStatus); + + /** + * Rebalances job assignments across collector nodes, typically triggered by status changes. + * Uses a callback mechanism to handle job reassignments. + * @param assignJobCollectorConsumer A biconsumer that handles the job reassignment process, + * taking the job assignment logic and collector ID as parameters + */ + void rebalanceJobs(BiConsumer assignJobCollectorConsumer); + + /** + * Removes a job from whichever collector node it is currently assigned to. + * @param jobId The unique identifier of the job to be removed + * @return The collector node from which the job was removed, or null if job wasn't found + */ + CollectorNode removeJob(Long jobId); +} diff --git a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/ManageServer.java b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/ManageServer.java index 352415b65f0..09e1391cc9c 100644 --- a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/ManageServer.java +++ b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/ManageServer.java @@ -28,7 +28,7 @@ import org.apache.hertzbeat.common.entity.message.ClusterMsg; import org.apache.hertzbeat.common.support.CommonThreadPool; import org.apache.hertzbeat.manager.scheduler.CollectorJobScheduler; -import org.apache.hertzbeat.manager.scheduler.SchedulerProperties; +import org.apache.hertzbeat.manager.properties.SchedulerProperties; import org.apache.hertzbeat.manager.scheduler.netty.process.CollectCyclicDataResponseProcessor; import org.apache.hertzbeat.manager.scheduler.netty.process.CollectCyclicServiceDiscoveryDataResponseProcessor; import org.apache.hertzbeat.manager.scheduler.netty.process.CollectOneTimeDataResponseProcessor; @@ -45,6 +45,8 @@ import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; +import javax.annotation.PreDestroy; + /** * manage server */ @@ -112,6 +114,7 @@ public void start() { }, 10, 3, TimeUnit.SECONDS); } + @PreDestroy public void shutdown() { this.remotingServer.shutdown(); diff --git a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/service/impl/CollectorServiceImpl.java b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/service/impl/CollectorServiceImpl.java index cafa02f291f..c3f2409786f 100644 --- a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/service/impl/CollectorServiceImpl.java +++ b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/service/impl/CollectorServiceImpl.java @@ -31,8 +31,9 @@ import org.apache.hertzbeat.common.util.IpDomainUtil; import org.apache.hertzbeat.manager.dao.CollectorDao; import org.apache.hertzbeat.manager.dao.CollectorMonitorBindDao; +import org.apache.hertzbeat.manager.pojo.CollectorNode; import org.apache.hertzbeat.manager.scheduler.AssignJobs; -import org.apache.hertzbeat.manager.scheduler.ConsistentHash; +import org.apache.hertzbeat.manager.scheduler.ConsistentHashCollectorKeeper; import org.apache.hertzbeat.manager.scheduler.netty.ManageServer; import org.apache.hertzbeat.manager.service.CollectorService; import org.springframework.beans.factory.annotation.Autowired; @@ -57,7 +58,7 @@ public class CollectorServiceImpl implements CollectorService { private CollectorMonitorBindDao collectorMonitorBindDao; @Autowired - private ConsistentHash consistentHash; + private ConsistentHashCollectorKeeper consistentHashCollectorKeeper; @Autowired(required = false) private ManageServer manageServer; @@ -81,7 +82,7 @@ public Page getCollectors(String name, int pageIndex, Integer List collectorSummaryList = new LinkedList<>(); for (Collector collector : collectors.getContent()) { CollectorSummary.CollectorSummaryBuilder summaryBuilder = CollectorSummary.builder().collector(collector); - ConsistentHash.Node node = consistentHash.getNode(collector.getName()); + CollectorNode node = consistentHashCollectorKeeper.getNode(collector.getName()); if (node != null && node.getAssignJobs() != null) { AssignJobs assignJobs = node.getAssignJobs(); summaryBuilder.pinMonitorNum(assignJobs.getPinnedJobs().size()); diff --git a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/service/impl/MonitorServiceImpl.java b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/service/impl/MonitorServiceImpl.java index ab185019374..6721d7fe1df 100644 --- a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/service/impl/MonitorServiceImpl.java +++ b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/service/impl/MonitorServiceImpl.java @@ -60,7 +60,7 @@ import org.apache.hertzbeat.manager.dao.ParamDao; import org.apache.hertzbeat.manager.pojo.dto.AppCount; import org.apache.hertzbeat.manager.pojo.dto.MonitorDto; -import org.apache.hertzbeat.manager.scheduler.CollectJobScheduling; +import org.apache.hertzbeat.manager.scheduler.JobOperation; import org.apache.hertzbeat.manager.service.AppService; import org.apache.hertzbeat.manager.service.ImExportService; import org.apache.hertzbeat.manager.service.LabelService; @@ -113,7 +113,7 @@ public class MonitorServiceImpl implements MonitorService { @Autowired private AppService appService; @Autowired - private CollectJobScheduling collectJobScheduling; + private JobOperation jobOperation; @Autowired private MonitorDao monitorDao; @Autowired @@ -193,11 +193,10 @@ public void addMonitor(Monitor monitor, List params, String collector, Gr return new Configmap(param.getField(), param.getParamValue(), param.getType()); }).collect(Collectors.toList()); appDefine.setConfigmap(configmaps); - long jobId = collector == null ? collectJobScheduling.addAsyncCollectJob(appDefine, null) : - collectJobScheduling.addAsyncCollectJob(appDefine, collector); - try { - detectMonitor(monitor, params, collector); - } catch (Exception ignored) {} + long jobId = jobOperation.addAsyncCollectJob(appDefine, collector); + + + detectMonitorSafely(monitor, params, collector); try { if (collector != null) { @@ -217,7 +216,7 @@ public void addMonitor(Monitor monitor, List params, String collector, Gr paramDao.saveAll(params); } catch (Exception e) { log.error("Error while adding monitor: {}", e.getMessage(), e); - collectJobScheduling.cancelAsyncCollectJob(jobId); + jobOperation.cancelAsyncCollectJob(jobId); throw new MonitorDatabaseException(e.getMessage()); } } @@ -510,18 +509,11 @@ public void modifyMonitor(Monitor monitor, List params, String collector, List configmaps = params.stream().map(param -> new Configmap(param.getField(), param.getParamValue(), param.getType())).collect(Collectors.toList()); appDefine.setConfigmap(configmaps); - long newJobId; - if (collector == null) { - newJobId = collectJobScheduling.updateAsyncCollectJob(appDefine); - } else { - newJobId = collectJobScheduling.updateAsyncCollectJob(appDefine, collector); - } + long newJobId = jobOperation.updateAsyncCollectJob(appDefine, collector); monitor.setJobId(newJobId); // execute only in non paused status - try { - detectMonitor(monitor, params, collector); - } catch (Exception ignored) {} + detectMonitorSafely(monitor, params, collector); } // After the update is successfully released, refresh the database @@ -548,7 +540,7 @@ public void modifyMonitor(Monitor monitor, List params, String collector, } catch (Exception e) { log.error(e.getMessage(), e); // Repository brushing abnormally cancels the previously delivered task - collectJobScheduling.cancelAsyncCollectJob(monitor.getJobId()); + jobOperation.cancelAsyncCollectJob(monitor.getJobId()); throw new MonitorDatabaseException(e.getMessage()); } } @@ -578,7 +570,7 @@ public void deleteMonitors(Set ids) throws RuntimeException { for (Monitor monitor : monitors) { monitorBindDao.deleteByMonitorId(monitor.getId()); collectorMonitorBindDao.deleteCollectorMonitorBindsByMonitorId(monitor.getId()); - collectJobScheduling.cancelAsyncCollectJob(monitor.getJobId()); + jobOperation.cancelAsyncCollectJob(monitor.getJobId()); applicationContext.publishEvent(new MonitorDeletedEvent(applicationContext, monitor.getId())); } } @@ -692,17 +684,17 @@ public void cancelManageMonitors(Set ids) { // The jobId is not deleted, and the jobId is reused again after the management is started. Set subMonitorIds = monitorBindDao.findMonitorBindsByBizIdIn(ids).stream().map(MonitorBind::getMonitorId).collect(Collectors.toSet()); ids.addAll(subMonitorIds); - List managedMonitors = monitorDao.findMonitorsByIdIn(ids) - .stream().filter(monitor -> - monitor.getStatus() != CommonConstants.MONITOR_PAUSED_CODE) + List managedMonitors = monitorDao.findMonitorsByIdIn(ids).stream() + .filter(monitor -> monitor.getStatus() != CommonConstants.MONITOR_PAUSED_CODE) .peek(monitor -> monitor.setStatus(CommonConstants.MONITOR_PAUSED_CODE)) .collect(Collectors.toList()); - if (!CollectionUtils.isEmpty(managedMonitors)) { - for (Monitor monitor : managedMonitors) { - collectJobScheduling.cancelAsyncCollectJob(monitor.getJobId()); - } - monitorDao.saveAll(managedMonitors); + + if (CollectionUtils.isEmpty(managedMonitors)) { + return; } + + managedMonitors.forEach(monitor -> jobOperation.cancelAsyncCollectJob(monitor.getJobId())); + monitorDao.saveAll(managedMonitors); } @Override @@ -710,9 +702,8 @@ public void enableManageMonitors(Set ids) { // Update monitoring status Add corresponding monitoring periodic task Set subMonitorIds = monitorBindDao.findMonitorBindsByBizIdIn(ids).stream().map(MonitorBind::getMonitorId).collect(Collectors.toSet()); ids.addAll(subMonitorIds); - List unManagedMonitors = monitorDao.findMonitorsByIdIn(ids) - .stream().filter(monitor -> - monitor.getStatus() == CommonConstants.MONITOR_PAUSED_CODE) + List unManagedMonitors = monitorDao.findMonitorsByIdIn(ids).stream() + .filter(monitor -> monitor.getStatus() == CommonConstants.MONITOR_PAUSED_CODE) .peek(monitor -> monitor.setStatus(CommonConstants.MONITOR_UP_CODE)) .collect(Collectors.toList()); if (unManagedMonitors.isEmpty()) { @@ -757,13 +748,11 @@ public void enableManageMonitors(Set ids) { Optional bindOptional = collectorMonitorBindDao.findCollectorMonitorBindByMonitorId(monitor.getId()); String collector = bindOptional.map(CollectorMonitorBind::getCollector).orElse(null); - long newJobId = collectJobScheduling.addAsyncCollectJob(appDefine, collector); + long newJobId = jobOperation.addAsyncCollectJob(appDefine, collector); monitor.setJobId(newJobId); applicationContext.publishEvent(new MonitorDeletedEvent(applicationContext, monitor.getId())); - try { - detectMonitor(monitor, params, collector); - } catch (Exception ignored) { - } + + detectMonitorSafely(monitor, params, collector); } monitorDao.saveAll(unManagedMonitors); } @@ -852,7 +841,7 @@ public void updateAppCollectJob(Job job) { // if is pinned collector String collector = monitorIdCollectorMap.get(monitor.getId()); // Delivering a collection task - long newJobId = collectJobScheduling.updateAsyncCollectJob(appDefine, collector); + long newJobId = jobOperation.updateAsyncCollectJob(appDefine, collector); monitor.setJobId(newJobId); monitorDao.save(monitor); } catch (Exception e) { @@ -932,12 +921,8 @@ private void detectSdMonitor(Monitor monitor, List params, String collect new Configmap(param.getField(), param.getParamValue(), param.getType())).collect(Collectors.toList()); appDefine.setConfigmap(configmaps); appDefine.setSd(true); - List collectRep; - if (collector != null) { - collectRep = collectJobScheduling.collectSyncJobData(appDefine, collector); - } else { - collectRep = collectJobScheduling.collectSyncJobData(appDefine); - } + List collectRep = jobOperation.collectSyncJobData(appDefine, collector); + monitor.setStatus(CommonConstants.MONITOR_UP_CODE); // If the detection result fails, a detection exception is thrown if (collectRep == null || collectRep.isEmpty()) { @@ -975,12 +960,7 @@ private void detectMonitorDirectly(Monitor monitor, List params, String c List availableMetrics = appDefine.getMetrics().stream() .filter(item -> item.getPriority() == 0).collect(Collectors.toList()); appDefine.setMetrics(availableMetrics); - List collectRep; - if (collector != null) { - collectRep = collectJobScheduling.collectSyncJobData(appDefine, collector); - } else { - collectRep = collectJobScheduling.collectSyncJobData(appDefine); - } + List collectRep = jobOperation.collectSyncJobData(appDefine, collector); monitor.setStatus(CommonConstants.MONITOR_UP_CODE); // If the detection result fails, a detection exception is thrown @@ -994,4 +974,10 @@ private void detectMonitorDirectly(Monitor monitor, List params, String c } collectRep.forEach(CollectRep.MetricsData::close); } + + private void detectMonitorSafely(Monitor monitor, List params, String collector) { + try { + detectMonitor(monitor, params, collector); + } catch (Exception ignored) {} + } } diff --git a/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/scheduler/CollectorJobSchedulerTest.java b/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/scheduler/CollectorJobSchedulerTest.java index e068c7b7fb2..bf604113e8c 100644 --- a/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/scheduler/CollectorJobSchedulerTest.java +++ b/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/scheduler/CollectorJobSchedulerTest.java @@ -29,6 +29,7 @@ import org.apache.hertzbeat.manager.dao.CollectorMonitorBindDao; import org.apache.hertzbeat.manager.dao.MonitorDao; import org.apache.hertzbeat.manager.dao.ParamDao; +import org.apache.hertzbeat.manager.pojo.CollectorNode; import org.apache.hertzbeat.manager.scheduler.netty.ManageServer; import org.apache.hertzbeat.manager.service.AppService; import org.junit.jupiter.api.BeforeEach; @@ -63,7 +64,7 @@ public class CollectorJobSchedulerTest { @InjectMocks private CollectorJobScheduler collectorJobScheduler; @Mock - private ConsistentHash consistentHash; + private ConsistentHashCollectorKeeper consistentHashCollectorKeeper; @Mock private CollectorDao collectorDao; @Mock @@ -85,8 +86,8 @@ void setUp() { public void testCollectSyncJobData() { assertDoesNotThrow(() -> { Job job = new Job(); - when(consistentHash.preDispatchJob(any(String.class))).thenReturn(null); - List list = collectorJobScheduler.collectSyncJobData(job); + when(consistentHashCollectorKeeper.determineNode(any(Long.class))).thenReturn(null); + List list = collectorJobScheduler.collectSyncJobData(job, null); assertEquals(1, list.size()); }); } @@ -127,9 +128,8 @@ public void testCollectorGoOnlineJobMetadataNotEmpty() { appDefine.setParams(Collections.emptyList()); when(appService.getAppDefine(anyString())).thenReturn(appDefine); - ConsistentHash.Node node = new ConsistentHash.Node(identity, collector.getMode(), - collector.getIp(), System.currentTimeMillis(), null); - when(consistentHash.getNode("collector-1")).thenReturn(node); + CollectorNode node = new CollectorNode(identity, collector.getMode(), collector.getIp(), System.currentTimeMillis(), null); + when(consistentHashCollectorKeeper.addJob(appDefine, identity)).thenReturn(node); ManageServer manageServer = mock(ManageServer.class); collectorJobScheduler.setManageServer(manageServer); @@ -138,7 +138,7 @@ public void testCollectorGoOnlineJobMetadataNotEmpty() { // Capture the parameters of sendMsg ArgumentCaptor msgCaptor = ArgumentCaptor.forClass(ClusterMsg.Message.class); - verify(manageServer, atLeastOnce()).sendMsg(eq("collector-1"), msgCaptor.capture()); + verify(manageServer, atLeastOnce()).sendMsg(eq(identity), msgCaptor.capture()); ClusterMsg.Message message = msgCaptor.getValue(); Job job = JsonUtil.fromJson(message.getMsg().toStringUtf8(), Job.class); diff --git a/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/scheduler/ConsistentHashCollectorKeeperTest.java b/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/scheduler/ConsistentHashCollectorKeeperTest.java new file mode 100644 index 00000000000..a27167f56a4 --- /dev/null +++ b/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/scheduler/ConsistentHashCollectorKeeperTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.manager.scheduler; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.apache.hertzbeat.common.constants.CollectorStatus; +import org.apache.hertzbeat.common.entity.job.Job; +import org.apache.hertzbeat.common.util.SnowFlakeIdGenerator; +import org.apache.hertzbeat.manager.pojo.CollectorNode; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Test case for {@link ConsistentHashCollectorKeeper} + */ +public class ConsistentHashCollectorKeeperTest { + + private ConsistentHashCollectorKeeper consistentHashCollectorKeeper; + + @BeforeEach + void setUp() { + consistentHashCollectorKeeper = new ConsistentHashCollectorKeeper(); + } + + @Test + void testAddNode() { + long jobId1 = SnowFlakeIdGenerator.generateId(); + long jobId2 = SnowFlakeIdGenerator.generateId(); + long jobId3 = SnowFlakeIdGenerator.generateId(); + CollectorNode node1 = new CollectorNode("node1", "public", "192.168.0.1", System.currentTimeMillis(), (byte) 10); + CollectorNode node2 = new CollectorNode("node2", "public", "192.168.0.2", System.currentTimeMillis(), (byte) 10); + consistentHashCollectorKeeper.addNode(node1); + consistentHashCollectorKeeper.determineNode(jobId1); + consistentHashCollectorKeeper.determineNode(jobId2); + consistentHashCollectorKeeper.determineNode(jobId3); + consistentHashCollectorKeeper.addNode(node2); + assertTrue(node2.getAssignJobs().getAddingJobs().containsAll(node1.getAssignJobs().getRemovingJobs())); + assertTrue(node2.getAssignJobs().getAddingJobs().containsAll(node2.getAssignJobs().getRemovingJobs())); + assertSame(consistentHashCollectorKeeper.getNode("node1"), node1); + assertSame(consistentHashCollectorKeeper.getNode("node2"), node2); + } + + @Test + void testDispatchJob() { + long jobId1 = SnowFlakeIdGenerator.generateId(); + CollectorNode res1 = consistentHashCollectorKeeper.determineNode(jobId1); + assertNull(res1); + CollectorNode node1 = new CollectorNode("node1", "public", "192.168.0.1", System.currentTimeMillis(), (byte) 10); + consistentHashCollectorKeeper.addNode(node1); + long jobId2 = SnowFlakeIdGenerator.generateId(); + CollectorNode res2 = consistentHashCollectorKeeper.determineNode(jobId2); + assertSame(res2, node1); + assertTrue(consistentHashCollectorKeeper.getDispatchJobCache().isEmpty()); + } + + @Test + void testRemoveNode() { + CollectorNode node1 = new CollectorNode("node1", "public", "192.168.0.1", System.currentTimeMillis(), (byte) 10); + CollectorNode node2 = new CollectorNode("node2", "public", "192.168.0.2", System.currentTimeMillis(), (byte) 10); + consistentHashCollectorKeeper.addNode(node1); + consistentHashCollectorKeeper.addNode(node2); + + long jobId1 = SnowFlakeIdGenerator.generateId(); + long jobId2 = SnowFlakeIdGenerator.generateId(); + long jobId3 = SnowFlakeIdGenerator.generateId(); + long jobId4 = SnowFlakeIdGenerator.generateId(); + consistentHashCollectorKeeper.addJob(Job.builder().id(jobId1).monitorId(jobId1).build(), null); + consistentHashCollectorKeeper.addJob(Job.builder().id(jobId2).monitorId(jobId2).build(), null); + consistentHashCollectorKeeper.addJob(Job.builder().id(jobId3).monitorId(jobId3).build(), null); + consistentHashCollectorKeeper.addJob(Job.builder().id(jobId4).monitorId(jobId4).build(), null); + + + consistentHashCollectorKeeper.changeStatus(node2.getIdentity(), CollectorStatus.OFFLINE); + assertTrue(node1.getAssignJobs().getAddingJobs().containsAll(node2.getAssignJobs().getRemovingJobs())); + assertSame(consistentHashCollectorKeeper.getNode("node1"), node1); + assertNull(consistentHashCollectorKeeper.getNode("node2")); + consistentHashCollectorKeeper.changeStatus(node1.getIdentity(), CollectorStatus.OFFLINE); + assertEquals(4, consistentHashCollectorKeeper.getDispatchJobCache().size()); + } + +} + diff --git a/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/scheduler/ConsistentHashTest.java b/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/scheduler/ConsistentHashTest.java deleted file mode 100644 index 3555f8490dc..00000000000 --- a/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/scheduler/ConsistentHashTest.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hertzbeat.manager.scheduler; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertSame; -import static org.junit.jupiter.api.Assertions.assertTrue; -import org.apache.hertzbeat.common.util.SnowFlakeIdGenerator; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -/** - * Test case for {@link ConsistentHash} - */ -public class ConsistentHashTest { - - private ConsistentHash consistentHash; - - @BeforeEach - void setUp() { - consistentHash = new ConsistentHash(); - } - - @Test - void testAddNode() { - String job1 = "job1"; - long jobId1 = SnowFlakeIdGenerator.generateId(); - String job2 = "job2"; - long jobId2 = SnowFlakeIdGenerator.generateId(); - String job3 = "job3"; - long jobId3 = SnowFlakeIdGenerator.generateId(); - ConsistentHash.Node node1 = new ConsistentHash.Node("node1", "public", "192.168.0.1", System.currentTimeMillis(), (byte) 10); - ConsistentHash.Node node2 = new ConsistentHash.Node("node2", "public", "192.168.0.2", System.currentTimeMillis(), (byte) 10); - consistentHash.addNode(node1); - consistentHash.dispatchJob(job1, jobId1); - consistentHash.dispatchJob(job2, jobId2); - consistentHash.dispatchJob(job3, jobId3); - consistentHash.addNode(node2); - assertTrue(node2.getAssignJobs().getAddingJobs().containsAll(node1.getAssignJobs().getRemovingJobs())); - assertTrue(node2.getAssignJobs().getAddingJobs().containsAll(node2.getAssignJobs().getRemovingJobs())); - assertSame(consistentHash.getNode("node1"), node1); - assertSame(consistentHash.getNode("node2"), node2); - } - - @Test - void testDispatchJob() { - String job1 = "job1"; - long jobId1 = SnowFlakeIdGenerator.generateId(); - ConsistentHash.Node res1 = consistentHash.dispatchJob(job1, jobId1); - assertNull(res1); - ConsistentHash.Node node1 = new ConsistentHash.Node("node1", "public", "192.168.0.1", System.currentTimeMillis(), (byte) 10); - consistentHash.addNode(node1); - String job2 = "job2"; - long jobId2 = SnowFlakeIdGenerator.generateId(); - ConsistentHash.Node res2 = consistentHash.dispatchJob(job2, jobId2); - assertSame(res2, node1); - assertTrue(consistentHash.getDispatchJobCache().isEmpty()); - } - - @Test - void testRemoveNode() { - String job1 = "job1"; - long jobId1 = SnowFlakeIdGenerator.generateId(); - String job2 = "job2"; - long jobId2 = SnowFlakeIdGenerator.generateId(); - String job3 = "job3"; - long jobId3 = SnowFlakeIdGenerator.generateId(); - ConsistentHash.Node node1 = new ConsistentHash.Node("node1", "public", "192.168.0.1", System.currentTimeMillis(), (byte) 10); - ConsistentHash.Node node2 = new ConsistentHash.Node("node2", "public", "192.168.0.2", System.currentTimeMillis(), (byte) 10); - consistentHash.addNode(node1); - consistentHash.addNode(node2); - consistentHash.dispatchJob(job1, jobId1); - consistentHash.dispatchJob(job2, jobId2); - consistentHash.dispatchJob(job3, jobId3); - consistentHash.removeNode(node2.getIdentity()); - assertTrue(node1.getAssignJobs().getAddingJobs().containsAll(node2.getAssignJobs().getRemovingJobs())); - assertSame(consistentHash.getNode("node1"), node1); - assertNull(consistentHash.getNode("node2")); - consistentHash.removeNode(node1.getIdentity()); - assertEquals(3, consistentHash.getDispatchJobCache().size()); - } - -} - diff --git a/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/service/CollectorServiceTest.java b/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/service/CollectorServiceTest.java index f5dc2095b76..77d0bb473f8 100644 --- a/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/service/CollectorServiceTest.java +++ b/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/service/CollectorServiceTest.java @@ -32,7 +32,7 @@ import org.apache.hertzbeat.common.support.exception.CommonException; import org.apache.hertzbeat.manager.dao.CollectorDao; import org.apache.hertzbeat.manager.dao.CollectorMonitorBindDao; -import org.apache.hertzbeat.manager.scheduler.ConsistentHash; +import org.apache.hertzbeat.manager.scheduler.ConsistentHashCollectorKeeper; import org.apache.hertzbeat.manager.scheduler.netty.ManageServer; import org.apache.hertzbeat.manager.service.impl.CollectorServiceImpl; import org.junit.jupiter.api.Test; @@ -60,7 +60,7 @@ public class CollectorServiceTest { private CollectorDao collectorDao; @Mock - private ConsistentHash consistentHash; + private ConsistentHashCollectorKeeper consistentHashCollectorKeeper; @Mock private CollectorMonitorBindDao collectorMonitorBindDao; diff --git a/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/service/MonitorServiceTest.java b/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/service/MonitorServiceTest.java index 7db847a451f..16cb0261179 100644 --- a/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/service/MonitorServiceTest.java +++ b/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/service/MonitorServiceTest.java @@ -46,7 +46,7 @@ import org.apache.hertzbeat.manager.dao.ParamDao; import org.apache.hertzbeat.manager.pojo.dto.AppCount; import org.apache.hertzbeat.manager.pojo.dto.MonitorDto; -import org.apache.hertzbeat.manager.scheduler.CollectJobScheduling; +import org.apache.hertzbeat.manager.scheduler.CollectorJobScheduler; import org.apache.hertzbeat.manager.service.impl.MonitorServiceImpl; import org.apache.hertzbeat.manager.support.exception.MonitorDatabaseException; import org.apache.hertzbeat.manager.support.exception.MonitorDetectException; @@ -105,7 +105,7 @@ class MonitorServiceTest { private LabelService tagService; @Mock - private CollectJobScheduling collectJobScheduling; + private CollectorJobScheduler jobOperation; @Mock private AlertDefineBindDao alertDefineBindDao; @@ -143,7 +143,7 @@ void detectMonitorEmpty() { when(appService.getAppDefine(monitor.getApp())).thenReturn(job); List collectRep = new ArrayList<>(); - when(collectJobScheduling.collectSyncJobData(job)).thenReturn(collectRep); + when(jobOperation.collectSyncJobData(job, null)).thenReturn(collectRep); List params = Collections.singletonList(new Param()); assertThrows(MonitorDetectException.class, () -> monitorService.detectMonitor(monitor, params, null)); @@ -170,7 +170,7 @@ void detectMonitorFail() { CollectRep.MetricsData failCode = CollectRep.MetricsData.newBuilder() .setCode(CollectRep.Code.TIMEOUT).setMsg("collect timeout").build(); collectRep.add(failCode); - when(collectJobScheduling.collectSyncJobData(job)).thenReturn(collectRep); + when(jobOperation.collectSyncJobData(job, null)).thenReturn(collectRep); List params = Collections.singletonList(new Param()); assertThrows(MonitorDetectException.class, () -> monitorService.detectMonitor(monitor, params, null)); @@ -186,7 +186,7 @@ void addMonitorSuccess() { .build(); Job job = new Job(); when(appService.getAppDefine(monitor.getApp())).thenReturn(job); - when(collectJobScheduling.addAsyncCollectJob(job, null)).thenReturn(1L); + when(jobOperation.addAsyncCollectJob(job, null)).thenReturn(1L); when(monitorDao.save(monitor)).thenReturn(monitor); List params = Collections.singletonList(new Param()); when(paramDao.saveAll(params)).thenReturn(params); @@ -203,7 +203,7 @@ void addMonitorException() { .build(); Job job = new Job(); when(appService.getAppDefine(monitor.getApp())).thenReturn(job); - when(collectJobScheduling.addAsyncCollectJob(job, null)).thenReturn(1L); + when(jobOperation.addAsyncCollectJob(job, null)).thenReturn(1L); List params = Collections.singletonList(new Param()); when(monitorDao.save(monitor)).thenThrow(RuntimeException.class); assertThrows(MonitorDatabaseException.class, () -> monitorService.addMonitor(monitor, params, null, null));