diff --git a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/notice/AlertNoticeDispatch.java b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/notice/AlertNoticeDispatch.java index 6d9207216b7..6e36a8e593e 100644 --- a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/notice/AlertNoticeDispatch.java +++ b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/notice/AlertNoticeDispatch.java @@ -21,8 +21,10 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.Executor; import lombok.extern.slf4j.Slf4j; import org.apache.hertzbeat.alert.AlerterWorkerPool; +import org.springframework.beans.factory.annotation.Qualifier; import org.apache.hertzbeat.alert.config.AlertSseManager; import org.apache.hertzbeat.common.entity.alerter.GroupAlert; import org.apache.hertzbeat.common.entity.alerter.NoticeReceiver; @@ -48,15 +50,20 @@ public class AlertNoticeDispatch { private final Map alertNotifyHandlerMap; private final PluginRunner pluginRunner; private final AlertSseManager emitterManager; + private final Executor restTemplateThreadPool; public AlertNoticeDispatch(AlerterWorkerPool workerPool, NoticeConfigService noticeConfigService, AlertStoreHandler alertStoreHandler, - List alertNotifyHandlerList, PluginRunner pluginRunner, AlertSseManager emitterManager) { + List alertNotifyHandlerList, + PluginRunner pluginRunner, + AlertSseManager emitterManager, + @Qualifier("restTemplateThreadPool") Executor restTemplateThreadPool) { this.workerPool = workerPool; this.noticeConfigService = noticeConfigService; this.alertStoreHandler = alertStoreHandler; this.pluginRunner = pluginRunner; + this.restTemplateThreadPool = restTemplateThreadPool; alertNotifyHandlerMap = Maps.newHashMapWithExpectedSize(alertNotifyHandlerList.size()); this.emitterManager = emitterManager; alertNotifyHandlerList.forEach(r -> alertNotifyHandlerMap.put(r.type(), r)); @@ -121,14 +128,16 @@ public void dispatchAlarm(GroupAlert groupAlert) { } private void sendNotify(GroupAlert alert) { - matchNoticeRulesByAlert(alert).ifPresent(noticeRules -> noticeRules.forEach(rule -> workerPool.executeNotify(() -> rule.getReceiverId() - .forEach(receiverId -> { + matchNoticeRulesByAlert(alert).ifPresent(noticeRules -> noticeRules.forEach(rule -> workerPool.executeNotify(() -> { + rule.getReceiverId().forEach(receiverId -> { + restTemplateThreadPool.execute(() -> { try { - sendNoticeMsg(getOneReceiverById(receiverId), - getOneTemplateById(rule.getTemplateId()), alert); - } catch (AlertNoticeException e) { - log.warn("DispatchTask sendNoticeMsg error, message: {}", e.getMessage()); + sendNoticeMsg(getOneReceiverById(receiverId), getOneTemplateById(rule.getTemplateId()), alert); + } catch (Exception e) { + log.warn("Async notification failed for receiver {}: {}", receiverId, e.getMessage()); } - })))); + }); + }); + }))); } } diff --git a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/notice/impl/AbstractAlertNotifyHandlerImpl.java b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/notice/impl/AbstractAlertNotifyHandlerImpl.java index a37b9a26843..ed3b140f9b9 100644 --- a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/notice/impl/AbstractAlertNotifyHandlerImpl.java +++ b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/notice/impl/AbstractAlertNotifyHandlerImpl.java @@ -26,6 +26,7 @@ import java.util.Locale; import java.util.Map; import java.util.ResourceBundle; +import java.util.concurrent.Executor; import lombok.extern.slf4j.Slf4j; import org.apache.hertzbeat.alert.AlerterProperties; import org.apache.hertzbeat.common.entity.alerter.GroupAlert; @@ -34,6 +35,7 @@ import org.apache.hertzbeat.common.util.ResourceBundleUtil; import org.apache.hertzbeat.alert.notice.AlertNotifyHandler; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.event.EventListener; import org.springframework.ui.freemarker.FreeMarkerTemplateUtils; import org.springframework.web.client.RestTemplate; @@ -51,7 +53,6 @@ abstract class AbstractAlertNotifyHandlerImpl implements AlertNotifyHandler { @Autowired protected AlerterProperties alerterProperties; - protected String renderContent(NoticeTemplate noticeTemplate, GroupAlert alert) throws TemplateException, IOException { StringTemplateLoader stringLoader = new StringTemplateLoader(); freemarker.template.Template templateRes; diff --git a/hertzbeat-alerter/src/test/java/org/apache/hertzbeat/alert/notice/AlertNoticeDispatchTest.java b/hertzbeat-alerter/src/test/java/org/apache/hertzbeat/alert/notice/AlertNoticeDispatchTest.java index 8102afcdc0b..b85477fd87c 100644 --- a/hertzbeat-alerter/src/test/java/org/apache/hertzbeat/alert/notice/AlertNoticeDispatchTest.java +++ b/hertzbeat-alerter/src/test/java/org/apache/hertzbeat/alert/notice/AlertNoticeDispatchTest.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.concurrent.Executor; import java.util.Collections; import java.util.List; import org.apache.hertzbeat.alert.AlerterWorkerPool; @@ -64,6 +65,9 @@ class AlertNoticeDispatchTest { @Mock private AlertSseManager emitterManager; + @Mock + private Executor restTemplateThreadPool; + private AlertNoticeDispatch alertNoticeDispatch; private static final int DISPATCH_THREADS = 3; @@ -82,7 +86,8 @@ void setUp() { alertStoreHandler, alertNotifyHandlerList, pluginRunner, - emitterManager + emitterManager, + restTemplateThreadPool ); receiver = NoticeReceiver.builder() diff --git a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/config/RestTemplateConfig.java b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/config/RestTemplateConfig.java index d856c1d9b78..1c3f7cbe817 100644 --- a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/config/RestTemplateConfig.java +++ b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/config/RestTemplateConfig.java @@ -18,6 +18,9 @@ package org.apache.hertzbeat.manager.config; import java.util.Collections; +import java.util.concurrent.Executor; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import okhttp3.ConnectionPool; import okhttp3.OkHttpClient; @@ -30,7 +33,6 @@ /** * restTemplate config - * todo thread pool */ @Configuration public class RestTemplateConfig { @@ -58,4 +60,16 @@ public ClientHttpRequestFactory simpleClientHttpRequestFactory() { ); } + @Bean("restTemplateThreadPool") + public Executor restTemplateThreadPool() { + return new ThreadPoolExecutor( + 2, + Integer.MAX_VALUE, + 60L, + TimeUnit.SECONDS, + new SynchronousQueue<>(), + r -> new Thread(r, "RestTemplate-" + r.hashCode()), + new ThreadPoolExecutor.CallerRunsPolicy() + ); + } }