Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hertzbeat.common.constants;

/**
* Enum representing the possible statuses of a collector.
*/
public enum CollectorStatus {
ONLINE, OFFLINE;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.hertzbeat.manager;

import javax.annotation.PostConstruct;

import org.apache.hertzbeat.common.constants.ConfigConstants;
import org.apache.hertzbeat.manager.nativex.HertzbeatRuntimeHintsRegistrar;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
Expand All @@ -36,10 +38,10 @@

@SpringBootApplication
@EnableJpaAuditing
@EnableJpaRepositories(basePackages = {"org.apache.hertzbeat"})
@EntityScan(basePackages = {"org.apache.hertzbeat"})
@ComponentScan(basePackages = {"org.apache.hertzbeat"})
@ConfigurationPropertiesScan(basePackages = {"org.apache.hertzbeat"})
@EnableJpaRepositories(basePackages = {ConfigConstants.PkgConstant.PKG})
@EntityScan(basePackages = {ConfigConstants.PkgConstant.PKG})
@ComponentScan(basePackages = {ConfigConstants.PkgConstant.PKG})
@ConfigurationPropertiesScan(basePackages = {ConfigConstants.PkgConstant.PKG})
@ImportRuntimeHints(HertzbeatRuntimeHintsRegistrar.class)
@EnableAsync
@EnableScheduling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.hertzbeat.common.entity.manager.StatusPageComponent;
import org.apache.hertzbeat.common.entity.manager.StatusPageHistory;
import org.apache.hertzbeat.common.entity.manager.StatusPageOrg;
import org.apache.hertzbeat.manager.config.StatusProperties;
import org.apache.hertzbeat.manager.properties.StatusProperties;
import org.apache.hertzbeat.manager.dao.MonitorDao;
import org.apache.hertzbeat.manager.dao.StatusPageComponentDao;
import org.apache.hertzbeat.manager.dao.StatusPageHistoryDao;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.hertzbeat.manager.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.manager.scheduler.ConsistentHash;
import org.apache.hertzbeat.manager.scheduler.SchedulerProperties;
import org.apache.hertzbeat.manager.scheduler.ConsistentHashCollectorKeeper;
import org.apache.hertzbeat.manager.properties.SchedulerProperties;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -33,8 +33,8 @@
public class SchedulerConfig {

@Bean
public ConsistentHash consistentHasInstance() {
return new ConsistentHash();
public ConsistentHashCollectorKeeper consistentHasInstance() {
return new ConsistentHashCollectorKeeper();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.hertzbeat.manager.nativex;

import java.lang.reflect.Constructor;
import java.util.Set;
import org.apache.sshd.common.channel.ChannelListener;
import org.apache.sshd.common.forward.PortForwardingEventListener;
Expand All @@ -28,7 +27,6 @@
import org.apache.sshd.common.session.SessionListener;
import org.apache.sshd.common.util.security.bouncycastle.BouncyCastleSecurityProviderRegistrar;
import org.apache.sshd.common.util.security.eddsa.EdDSASecurityProviderRegistrar;
import org.springframework.aot.hint.ExecutableMode;
import org.springframework.aot.hint.MemberCategory;
import org.springframework.aot.hint.RuntimeHints;
import org.springframework.aot.hint.RuntimeHintsRegistrar;
Expand Down Expand Up @@ -60,11 +58,4 @@ public void registerHints(@NonNull RuntimeHints hints, ClassLoader classLoader)
TypeReference.of(PortForwardingEventListener.class), TypeReference.of(SessionListener.class));
}
}

private void registerConstructor(RuntimeHints hints, Class<?> clazz) {
Constructor<?>[] declaredConstructors = clazz.getDeclaredConstructors();
for (Constructor<?> declaredConstructor : declaredConstructors) {
hints.reflection().registerConstructor(declaredConstructor, ExecutableMode.INVOKE);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hertzbeat.manager.pojo;

import lombok.Data;
import org.apache.hertzbeat.common.constants.CollectorStatus;
import org.apache.hertzbeat.manager.scheduler.AssignJobs;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
* Collector Node
*/
@Data
public class CollectorNode {
/**
* Default number of VM nodes
*/
private static final byte VIRTUAL_NODE_DEFAULT_SIZE = 10;

/**
* collector identity
*/
private final String identity;

/**
* collector mode: public or private
*/
private String mode;

/**
* ip
*/
private String ip;

/**
* collector On-line time stamp
*/
private long uptime;

/**
* collector's own performance service quality score 0 - 127
* The number of virtual nodes will be calculated based on this service quality score
*
*/
private Byte quality;

private CollectorStatus collectorStatus;

/**
* use this collector's collect job ID list
* jobId,jobVersion
*/
private AssignJobs assignJobs;

/**
* the collection task ID list mapped by each virtual node corresponding to this node
* Long[] [0]-jobId, [1]-dispatchHash
*/
private Map<Integer, Set<Long[]>> virtualNodeMap;

public CollectorNode(String identity, String mode, String ip, long uptime, Byte quality) {
this.identity = identity;
this.mode = mode;
this.ip = ip;
this.uptime = uptime;
this.quality = quality;
assignJobs = new AssignJobs();
virtualNodeMap = new ConcurrentHashMap<>(VIRTUAL_NODE_DEFAULT_SIZE);
}

public synchronized void addJob(Integer virtualNodeKey, Integer dispatchHash, Long jobId, boolean isFlushed) {
if (virtualNodeMap == null) {
virtualNodeMap = new ConcurrentHashMap<>(VIRTUAL_NODE_DEFAULT_SIZE);
}
if (assignJobs == null) {
assignJobs = new AssignJobs();
}
Set<Long[]> virtualNodeJob = virtualNodeMap.computeIfAbsent(virtualNodeKey, k -> ConcurrentHashMap.newKeySet(16));
virtualNodeJob.add(new Long[]{jobId, dispatchHash.longValue()});
if (isFlushed) {
assignJobs.addAssignJob(jobId);
} else {
assignJobs.addAddingJob(jobId);
}
}

/**
* obtain the collection task routed by the specified virtual node according to virtualNodeKey
* @param virtualNodeKey virtualNodeKey
* @return collection task
*/
public Set<Long[]> clearVirtualNodeJobs(Integer virtualNodeKey) {
if (virtualNodeMap == null || virtualNodeMap.isEmpty()) {
return null;
}
Set<Long[]> virtualNodeJobs = virtualNodeMap.remove(virtualNodeKey);
virtualNodeMap.put(virtualNodeKey, ConcurrentHashMap.newKeySet(16));
return virtualNodeJobs;
}

public void addVirtualNodeJobs(Integer virtualHashKey, Set<Long[]> reDispatchJobs) {
if (reDispatchJobs == null) {
return;
}
if (virtualNodeMap == null) {
virtualNodeMap = new ConcurrentHashMap<>(16);
}
virtualNodeMap.computeIfPresent(virtualHashKey, (k, v) -> {
reDispatchJobs.addAll(v);
return v;
});
virtualNodeMap.put(virtualHashKey, reDispatchJobs);
}

public void removeVirtualNodeJob(Long jobId) {
if (jobId == null || virtualNodeMap == null) {
return;
}
for (Set<Long[]> jobSet : virtualNodeMap.values()) {
Optional<Long[]> optional = jobSet.stream().filter(item -> Objects.equals(item[0], jobId)).findFirst();
if (optional.isPresent()) {
jobSet.remove(optional.get());
break;
}
}
}

public void destroy() {
if (assignJobs != null) {
assignJobs.clear();
}
if (virtualNodeMap != null) {
virtualNodeMap.clear();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hertzbeat.manager.pojo;

import org.apache.hertzbeat.common.entity.job.Job;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Utility class for caching {@link Job} objects in memory.
* <p>
* This class provides static methods to store, retrieve, and remove {@code Job} instances
* using a thread-safe {@link ConcurrentHashMap}. It is intended to be used as a simple
* in-memory cache for job data within the manager component.
* <p>
* Usage:
* <pre>
* JobCache.put(job);
* Job job = JobCache.get(jobId);
* JobCache.remove(jobId);
* </pre>
*/
public class JobCache {
private static final Map<Long, Job> jobContentCache = new ConcurrentHashMap<>(16);

public static Job get(Long jobId) {
return jobContentCache.get(jobId);
}

public static void put(Job job) {
jobContentCache.put(job.getId(), job);
}

public static void remove(Long jobId) {
jobContentCache.remove(jobId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.hertzbeat.manager.scheduler;
package org.apache.hertzbeat.manager.properties;

import lombok.Getter;
import lombok.Setter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.hertzbeat.manager.config;
package org.apache.hertzbeat.manager.properties;

import lombok.Getter;
import lombok.Setter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
@Slf4j
@Data
public class AssignJobs {
private static final Integer DEFAULT_CAPACITY = 16;

/**
* current assign jobIds
Expand All @@ -51,10 +52,10 @@ public class AssignJobs {
private Set<Long> pinnedJobs;

public AssignJobs() {
jobs = ConcurrentHashMap.newKeySet(16);
addingJobs = ConcurrentHashMap.newKeySet(16);
removingJobs = ConcurrentHashMap.newKeySet(16);
pinnedJobs = ConcurrentHashMap.newKeySet(16);
jobs = ConcurrentHashMap.newKeySet(DEFAULT_CAPACITY);
addingJobs = ConcurrentHashMap.newKeySet(DEFAULT_CAPACITY);
removingJobs = ConcurrentHashMap.newKeySet(DEFAULT_CAPACITY);
pinnedJobs = ConcurrentHashMap.newKeySet(DEFAULT_CAPACITY);
}

public void addAssignJob(Long jobId) {
Expand Down
Loading
Loading