From e9806b37b775423464c63aa31867bf53d2e3c15b Mon Sep 17 00:00:00 2001 From: chengxy Date: Tue, 4 Jul 2023 11:32:35 +0800 Subject: [PATCH 01/13] feature: add opentelemry --- adapter/runtime/pom.xml | 6 + .../eventbridge/adapter/runtime/Runtime.java | 8 +- .../runtime/boot/EventBusListener.java | 12 +- .../runtime/boot/EventRuleTransfer.java | 5 +- .../runtime/boot/EventTargetTrigger.java | 5 +- .../boot/listener/EventSubscriber.java | 8 + .../runtimer/RocketMQEventSubscriber.java | 20 +- common/pom.xml | 22 ++ .../eventbridge/metrics/NopLongCounter.java | 19 ++ .../eventbridge/metrics/NopLongHistogram.java | 19 ++ .../metrics/NopLongUpDownCounter.java | 20 ++ .../metrics/NopObservableLongGauge.java | 6 + metrics/pom.xml | 121 +++++++ .../rocketmq/eventbridge/BridgeConfig.java | 152 +++++++++ .../eventbridge/BridgeMetricsConstant.java | 22 ++ .../eventbridge/BridgeMetricsManager.java | 322 ++++++++++++++++++ .../org/apache/rocketmq/eventbridge/Pair.java | 27 ++ pom.xml | 1 + 18 files changed, 785 insertions(+), 10 deletions(-) create mode 100644 common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongCounter.java create mode 100644 common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongHistogram.java create mode 100644 common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongUpDownCounter.java create mode 100644 common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopObservableLongGauge.java create mode 100644 metrics/pom.xml create mode 100644 metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeConfig.java create mode 100644 metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsConstant.java create mode 100644 metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java create mode 100644 metrics/src/main/java/org/apache/rocketmq/eventbridge/Pair.java diff --git a/adapter/runtime/pom.xml b/adapter/runtime/pom.xml index a3f501fd..ff07354c 100644 --- a/adapter/runtime/pom.xml +++ b/adapter/runtime/pom.xml @@ -101,6 +101,12 @@ 4.0.3 test + + org.apache.rocketmq.eventbridge + rocketmq-eventbridge-metrics + 1.0.0 + compile + diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java index af8bbbd3..1a90ca34 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.eventbridge.adapter.runtime; +import org.apache.rocketmq.eventbridge.BridgeMetricsManager; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.EventBusListener; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.EventRuleTransfer; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.EventTargetTrigger; @@ -69,9 +70,10 @@ public void initAndStart() throws Exception { circulatorContext.initCirculatorContext(runnerConfigObserver.getTargetRunnerConfig()); runnerConfigObserver.registerListener(circulatorContext); runnerConfigObserver.registerListener(eventSubscriber); - EventBusListener eventBusListener = new EventBusListener(circulatorContext, eventSubscriber, errorHandler); - EventRuleTransfer eventRuleTransfer = new EventRuleTransfer(circulatorContext, offsetManager, errorHandler); - EventTargetTrigger eventTargetPusher = new EventTargetTrigger(circulatorContext, offsetManager, errorHandler); + BridgeMetricsManager metricsManager = eventSubscriber.getMetricsManager(); + EventBusListener eventBusListener = new EventBusListener(circulatorContext, eventSubscriber, errorHandler, metricsManager); + EventRuleTransfer eventRuleTransfer = new EventRuleTransfer(circulatorContext, offsetManager, errorHandler, metricsManager); + EventTargetTrigger eventTargetPusher = new EventTargetTrigger(circulatorContext, offsetManager, errorHandler, metricsManager); RUNTIME_START_AND_SHUTDOWN.appendStartAndShutdown(eventBusListener); RUNTIME_START_AND_SHUTDOWN.appendStartAndShutdown(eventRuleTransfer); RUNTIME_START_AND_SHUTDOWN.appendStartAndShutdown(eventTargetPusher); diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java index 48af27f3..3b2fb45c 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java @@ -19,8 +19,13 @@ import com.google.common.collect.Lists; import io.openmessaging.connector.api.data.ConnectRecord; + +import java.util.ArrayList; import java.util.List; +import java.util.Optional; + import org.apache.commons.collections.CollectionUtils; +import org.apache.rocketmq.eventbridge.BridgeMetricsManager; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorContext; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.listener.EventSubscriber; import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread; @@ -40,12 +45,14 @@ public class EventBusListener extends ServiceThread { private final CirculatorContext circulatorContext; private final EventSubscriber eventSubscriber; private final ErrorHandler errorHandler; + private BridgeMetricsManager metricsManager; public EventBusListener(CirculatorContext circulatorContext, EventSubscriber eventSubscriber, - ErrorHandler errorHandler) { + ErrorHandler errorHandler, BridgeMetricsManager metricsManager) { this.circulatorContext = circulatorContext; this.eventSubscriber = eventSubscriber; this.errorHandler = errorHandler; + this.metricsManager = metricsManager; } @Override @@ -53,7 +60,8 @@ public void run() { while (!stopped) { List pullRecordList = Lists.newArrayList(); try { - pullRecordList = eventSubscriber.pull(); + pullRecordList = Optional.ofNullable(eventSubscriber.pull()).orElse(new ArrayList<>()); + BridgeMetricsManager.messagesInTotal.add(pullRecordList.size()); if (CollectionUtils.isEmpty(pullRecordList)) { this.waitForRunning(1000); continue; diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java index 79dfd00f..2621c14d 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java @@ -26,6 +26,7 @@ import java.util.concurrent.CompletableFuture; import javax.annotation.PostConstruct; import org.apache.commons.collections.MapUtils; +import org.apache.rocketmq.eventbridge.BridgeMetricsManager; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorContext; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.OffsetManager; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.transfer.TransformEngine; @@ -47,12 +48,14 @@ public class EventRuleTransfer extends ServiceThread { private final CirculatorContext circulatorContext; private final OffsetManager offsetManager; private final ErrorHandler errorHandler; + private BridgeMetricsManager metricsManager; public EventRuleTransfer(CirculatorContext circulatorContext, OffsetManager offsetManager, - ErrorHandler errorHandler) { + ErrorHandler errorHandler, BridgeMetricsManager metricsManager) { this.circulatorContext = circulatorContext; this.offsetManager = offsetManager; this.errorHandler = errorHandler; + this.metricsManager = metricsManager; } @Override diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java index 85dae3b8..838e8948 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java @@ -26,6 +26,7 @@ import java.util.concurrent.ExecutorService; import org.apache.commons.collections.MapUtils; +import org.apache.rocketmq.eventbridge.BridgeMetricsManager; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.OffsetManager; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorContext; import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread; @@ -47,12 +48,14 @@ public class EventTargetTrigger extends ServiceThread { private final OffsetManager offsetManager; private final ErrorHandler errorHandler; private volatile Integer batchSize = 100; + private BridgeMetricsManager metricsManager; public EventTargetTrigger(CirculatorContext circulatorContext, OffsetManager offsetManager, - ErrorHandler errorHandler) { + ErrorHandler errorHandler, BridgeMetricsManager metricsManager) { this.circulatorContext = circulatorContext; this.offsetManager = offsetManager; this.errorHandler = errorHandler; + this.metricsManager = metricsManager; } @Override diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java index be4db9bb..772d9ef2 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.eventbridge.adapter.runtime.boot.listener; import io.openmessaging.connector.api.data.ConnectRecord; +import org.apache.rocketmq.eventbridge.BridgeMetricsManager; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.TargetRunnerListener; import org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.SubscribeRunnerKeys; import org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.TargetRunnerConfig; @@ -34,6 +35,13 @@ public abstract class EventSubscriber implements TargetRunnerListener { */ public abstract void refresh(SubscribeRunnerKeys subscribeRunnerKeys, RefreshTypeEnum refreshTypeEnum); + + /** + * fetch metrics configuration + * @return + */ + public abstract BridgeMetricsManager getMetricsManager(); + /** * Pull connect records from store, Blocking method when is empty. * diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java index df9cda70..683db76f 100644 --- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java +++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java @@ -36,6 +36,8 @@ import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.utils.NetworkUtil; +import org.apache.rocketmq.eventbridge.BridgeConfig; +import org.apache.rocketmq.eventbridge.BridgeMetricsManager; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.listener.EventSubscriber; import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread; import org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.SubscribeRunnerKeys; @@ -86,6 +88,7 @@ public class RocketMQEventSubscriber extends EventSubscriber { private Integer pullTimeOut; private Integer pullBatchSize; + private BridgeConfig bridgeConfig; private ClientConfig clientConfig; private SessionCredentials sessionCredentials; private String socksProxy; @@ -119,6 +122,13 @@ public void refresh(SubscribeRunnerKeys subscribeRunnerKeys, RefreshTypeEnum ref } } + @Override + public BridgeMetricsManager getMetricsManager() { + BridgeMetricsManager metricsManager = new BridgeMetricsManager(bridgeConfig); + return metricsManager; + } + + @Override public List pull() { ArrayList messages = new ArrayList<>(); @@ -187,13 +197,17 @@ private void initMqProperties() { String socks5Password = properties.getProperty("rocketmq.consumer.socks5Password"); String socks5Endpoint = properties.getProperty("rocketmq.consumer.socks5Endpoint"); + String metricsAddress = properties.getProperty("metrics.endpoint.address"); + String metricsCollectorMode = properties.getProperty("metrics.collector.mode"); clientConfig.setNameSrvAddr(namesrvAddr); - clientConfig.setConsumerGroup(StringUtils.isBlank(consumerGroup) ? - createGroupName(SYS_DEFAULT_GROUP) : consumerGroup); clientConfig.setAccessChannel(AccessChannel.CLOUD.name().equals(accessChannel) ? - AccessChannel.CLOUD : AccessChannel.LOCAL); + AccessChannel.CLOUD : AccessChannel.LOCAL); clientConfig.setNamespace(namespace); + BridgeConfig bridgeConfig = new BridgeConfig(); + bridgeConfig.setEventBridgeAddress(metricsAddress); + bridgeConfig.setMetricsExporterType(Integer.parseInt(metricsCollectorMode)); this.clientConfig = clientConfig; + this.bridgeConfig = bridgeConfig; if (StringUtils.isNotBlank(accessKey) && StringUtils.isNotBlank(secretKey)) { this.sessionCredentials = new SessionCredentials(accessKey, secretKey); diff --git a/common/pom.xml b/common/pom.xml index a2e78fc6..89dff4f3 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -23,6 +23,8 @@ 8 8 + 1.19.0 + 1.19.0-alpha @@ -68,5 +70,25 @@ assertj-core test + + io.opentelemetry + opentelemetry-exporter-otlp + ${opentelemetry.version} + + + io.opentelemetry + opentelemetry-exporter-prometheus + ${opentelemetry-exporter-prometheus.version} + + + io.opentelemetry + opentelemetry-exporter-logging + ${opentelemetry.version} + + + io.opentelemetry + opentelemetry-sdk + ${opentelemetry.version} + \ No newline at end of file diff --git a/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongCounter.java b/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongCounter.java new file mode 100644 index 00000000..fa48bc19 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongCounter.java @@ -0,0 +1,19 @@ +package org.apache.rocketmq.eventbridge.metrics; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.context.Context; + +public class NopLongCounter implements LongCounter { + @Override public void add(long l) { + + } + + @Override public void add(long l, Attributes attributes) { + + } + + @Override public void add(long l, Attributes attributes, Context context) { + + } +} diff --git a/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongHistogram.java b/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongHistogram.java new file mode 100644 index 00000000..f55986c7 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongHistogram.java @@ -0,0 +1,19 @@ +package org.apache.rocketmq.eventbridge.metrics; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongHistogram; +import io.opentelemetry.context.Context; + +public class NopLongHistogram implements LongHistogram { + @Override public void record(long l) { + + } + + @Override public void record(long l, Attributes attributes) { + + } + + @Override public void record(long l, Attributes attributes, Context context) { + + } +} \ No newline at end of file diff --git a/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongUpDownCounter.java b/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongUpDownCounter.java new file mode 100644 index 00000000..f5ef4296 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongUpDownCounter.java @@ -0,0 +1,20 @@ +package org.apache.rocketmq.eventbridge.metrics; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongUpDownCounter; +import io.opentelemetry.context.Context; + + +public class NopLongUpDownCounter implements LongUpDownCounter { + @Override public void add(long l) { + + } + + @Override public void add(long l, Attributes attributes) { + + } + + @Override public void add(long l, Attributes attributes, Context context) { + + } +} \ No newline at end of file diff --git a/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopObservableLongGauge.java b/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopObservableLongGauge.java new file mode 100644 index 00000000..17cbbc3a --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopObservableLongGauge.java @@ -0,0 +1,6 @@ +package org.apache.rocketmq.eventbridge.metrics; + +import io.opentelemetry.api.metrics.ObservableLongGauge; + +public class NopObservableLongGauge implements ObservableLongGauge { +} diff --git a/metrics/pom.xml b/metrics/pom.xml new file mode 100644 index 00000000..7eb23ce4 --- /dev/null +++ b/metrics/pom.xml @@ -0,0 +1,121 @@ + + + + + rocketmq-eventbridge + org.apache.rocketmq + 1.0.0 + + 4.0.0 + + org.apache.rocketmq.eventbridge + rocketmq-eventbridge-metrics + + rocketmq-eventbridge-metrics + + http://www.example.com + + + UTF-8 + 1.19.0 + 1.19.0-alpha + + + + + org.apache.rocketmq + rocketmq-eventbridge-common + 1.0.0 + provided + + + io.opentelemetry + opentelemetry-exporter-otlp + ${opentelemetry.version} + + + com.squareup.okio + okio-jvm + + + + + io.opentelemetry + opentelemetry-exporter-prometheus + ${opentelemetry-exporter-prometheus.version} + + + io.opentelemetry + opentelemetry-exporter-logging + ${opentelemetry.version} + + + io.opentelemetry + opentelemetry-sdk + ${opentelemetry.version} + + + junit + junit + 4.11 + test + + + io.github.aliyunmq + rocketmq-shaded-slf4j-api-bridge + 1.0.0 + + + org.slf4j + jul-to-slf4j + 2.0.6 + + + + + + + + + maven-clean-plugin + 3.1.0 + + + + maven-resources-plugin + 3.0.2 + + + maven-compiler-plugin + 3.8.0 + + + maven-surefire-plugin + 2.22.1 + + + maven-jar-plugin + 3.0.2 + + + maven-install-plugin + 2.5.2 + + + maven-deploy-plugin + 2.8.2 + + + + maven-site-plugin + 3.7.1 + + + maven-project-info-reports-plugin + 3.0.0 + + + + + diff --git a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeConfig.java b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeConfig.java new file mode 100644 index 00000000..27c3ae9a --- /dev/null +++ b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeConfig.java @@ -0,0 +1,152 @@ +package org.apache.rocketmq.eventbridge; + +public class BridgeConfig { + public enum MetricsExporterType { + DISABLE(0), + OTLP_GRPC(1), + PROM(2), + LOG(3); + + private final int value; + + MetricsExporterType(int value) { + this.value = value; + } + + public int getValue() { + return value; + } + + public static MetricsExporterType valueOf(int value) { + switch (value) { + case 1: + return OTLP_GRPC; + case 2: + return PROM; + case 3: + return LOG; + default: + return DISABLE; + } + } + + public boolean isEnable() { + return this.value > 0; + } + } + + private MetricsExporterType metricsExporterType = MetricsExporterType.DISABLE; + + private String metricsGrpcExporterTarget = ""; + private String metricsGrpcExporterHeader = ""; + private long metricGrpcExporterTimeOutInMills = 3 * 1000; + private long metricGrpcExporterIntervalInMills = 60 * 1000; + private long metricLoggingExporterIntervalInMills = 10 * 1000; + + private int metricsPromExporterPort = 5557; + private String metricsPromExporterHost = ""; + + // Label pairs in CSV. Each label follows pattern of Key:Value. eg: instance_id:xxx,uid:xxx + private String metricsLabel = ""; + + private boolean metricsInDelta = false; + + private String eventBridgeAddress; + + + public MetricsExporterType getMetricsExporterType() { + return metricsExporterType; + } + + public void setMetricsExporterType(MetricsExporterType metricsExporterType) { + this.metricsExporterType = metricsExporterType; + } + + public void setMetricsExporterType(int metricsExporterType) { + this.metricsExporterType = MetricsExporterType.valueOf(metricsExporterType); + } + + public void setMetricsExporterType(String metricsExporterType) { + this.metricsExporterType = MetricsExporterType.valueOf(metricsExporterType); + } + + public String getMetricsGrpcExporterTarget() { + return metricsGrpcExporterTarget; + } + + public void setMetricsGrpcExporterTarget(String metricsGrpcExporterTarget) { + this.metricsGrpcExporterTarget = metricsGrpcExporterTarget; + } + + public String getMetricsGrpcExporterHeader() { + return metricsGrpcExporterHeader; + } + + public void setMetricsGrpcExporterHeader(String metricsGrpcExporterHeader) { + this.metricsGrpcExporterHeader = metricsGrpcExporterHeader; + } + + public long getMetricGrpcExporterTimeOutInMills() { + return metricGrpcExporterTimeOutInMills; + } + + public void setMetricGrpcExporterTimeOutInMills(long metricGrpcExporterTimeOutInMills) { + this.metricGrpcExporterTimeOutInMills = metricGrpcExporterTimeOutInMills; + } + + public long getMetricGrpcExporterIntervalInMills() { + return metricGrpcExporterIntervalInMills; + } + + public void setMetricGrpcExporterIntervalInMills(long metricGrpcExporterIntervalInMills) { + this.metricGrpcExporterIntervalInMills = metricGrpcExporterIntervalInMills; + } + + public long getMetricLoggingExporterIntervalInMills() { + return metricLoggingExporterIntervalInMills; + } + + public void setMetricLoggingExporterIntervalInMills(long metricLoggingExporterIntervalInMills) { + this.metricLoggingExporterIntervalInMills = metricLoggingExporterIntervalInMills; + } + + public String getMetricsLabel() { + return metricsLabel; + } + + public void setMetricsLabel(String metricsLabel) { + this.metricsLabel = metricsLabel; + } + + public boolean isMetricsInDelta() { + return metricsInDelta; + } + + public void setMetricsInDelta(boolean metricsInDelta) { + this.metricsInDelta = metricsInDelta; + } + + public int getMetricsPromExporterPort() { + return metricsPromExporterPort; + } + + public void setMetricsPromExporterPort(int metricsPromExporterPort) { + this.metricsPromExporterPort = metricsPromExporterPort; + } + + public String getMetricsPromExporterHost() { + return metricsPromExporterHost; + } + + public void setMetricsPromExporterHost(String metricsPromExporterHost) { + this.metricsPromExporterHost = metricsPromExporterHost; + } + + public String getEventBridgeAddress() { + return eventBridgeAddress; + } + + public void setEventBridgeAddress(String eventBridgeAddress) { + this.eventBridgeAddress = eventBridgeAddress; + } +} diff --git a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsConstant.java b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsConstant.java new file mode 100644 index 00000000..90a38385 --- /dev/null +++ b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsConstant.java @@ -0,0 +1,22 @@ +package org.apache.rocketmq.eventbridge; + +public class BridgeMetricsConstant { + public static final String OPEN_TELEMETRY_METER_NAME = "bridge-meter"; + + public static final String GAUGE_PROCESSOR_GAUGE = "target_queue_gauge"; + public static final String RULE_QUEUE_GAUGE = "rule_queue_gauge"; + + public static final String COUNTER_MESSAGES_IN_TOTAL = "eventbridge_messages_in_total"; + public static final String COUNTER_MESSAGES_OUT_TOTAL = "eventbridge_messages_out_total"; + public static final String COUNTER_THROUGHPUT_IN_TOTAL = "eventbridge_throughput_in_total"; + public static final String COUNTER_THROUGHPUT_OUT_TOTAL = "eventbridge_throughput_out_total"; + public static final String HISTOGRAM_MESSAGE_SIZE = "eventbridge_message_size"; + + + /** eventbridge process message latency**/ + public static final String HISTOGRAM_RPC_LATENCY = "process_latency"; + + public static final String LABEL_AGGREGATION = "aggregation"; + public static final String AGGREGATION_DELTA = "delta"; + public static final String LABEL_PROCESSOR = "processor"; +} diff --git a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java new file mode 100644 index 00000000..ff1bc684 --- /dev/null +++ b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java @@ -0,0 +1,322 @@ +package org.apache.rocketmq.eventbridge; + +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongHistogram; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableLongGauge; +import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter; +import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder; +import io.opentelemetry.exporter.prometheus.PrometheusHttpServer; +import io.opentelemetry.exporter.logging.LoggingMetricExporter; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.Aggregation; +import io.opentelemetry.sdk.metrics.InstrumentSelector; +import io.opentelemetry.sdk.metrics.InstrumentType; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; +import io.opentelemetry.sdk.metrics.View; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; +import io.opentelemetry.sdk.resources.Resource; + +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; + +import org.apache.rocketmq.eventbridge.metrics.NopLongCounter; +import org.apache.rocketmq.eventbridge.metrics.NopLongHistogram; +import org.apache.rocketmq.eventbridge.metrics.NopObservableLongGauge; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.bridge.SLF4JBridgeHandler; + +import static org.apache.rocketmq.eventbridge.BridgeMetricsConstant.*; + + +public class BridgeMetricsManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(BridgeMetricsManager.class); + + private final BridgeConfig bridgeConfig; + private final static Map LABEL_MAP = new HashMap<>(); + private OtlpGrpcMetricExporter metricExporter; + private PeriodicMetricReader periodicMetricReader; + private PrometheusHttpServer prometheusHttpServer; + private LoggingMetricExporter loggingMetricExporter; + private Meter bridgeMeter; + + // queue stats metrics + public static ObservableLongGauge targetGauge = new NopObservableLongGauge(); + public static ObservableLongGauge ruleGauge = new NopObservableLongGauge(); + + + //invoke timeout + public static LongHistogram invokeLatency = new NopLongHistogram(); + + // request metrics + public static LongCounter messagesInTotal = new NopLongCounter(); + public static LongCounter messagesOutTotal = new NopLongCounter(); + public static LongCounter throughputInTotal = new NopLongCounter(); + public static LongCounter throughputOutTotal = new NopLongCounter(); + public static LongHistogram messageSize = new NopLongHistogram(); + + public BridgeMetricsManager(BridgeConfig bridgeConfig) { + this.bridgeConfig = bridgeConfig; + } + + + public static AttributesBuilder newAttributesBuilder() { + AttributesBuilder attributesBuilder = Attributes.builder(); + LABEL_MAP.forEach(attributesBuilder::put); + return attributesBuilder; + } + + private boolean checkConfig() { + if (bridgeConfig == null) { + return false; + } + BridgeConfig.MetricsExporterType exporterType = bridgeConfig.getMetricsExporterType(); + if (!exporterType.isEnable()) { + return false; + } + + switch (exporterType) { + case OTLP_GRPC: + return StringUtils.isNotBlank(bridgeConfig.getMetricsGrpcExporterTarget()); + case PROM: + return true; + case LOG: + return true; + } + return false; + } + + public void init() { + BridgeConfig.MetricsExporterType metricsExporterType = bridgeConfig.getMetricsExporterType(); + if (metricsExporterType == BridgeConfig.MetricsExporterType.DISABLE) { + return; + } + + if (!checkConfig()) { + LOGGER.error("check metrics config failed, will not export metrics"); + return; + } + + String labels = bridgeConfig.getMetricsLabel(); + if (StringUtils.isNotBlank(labels)) { + List kvPairs = Splitter.on(',').omitEmptyStrings().splitToList(labels); + for (String item : kvPairs) { + String[] split = item.split(":"); + if (split.length != 2) { + LOGGER.warn("metricsLabel is not valid: {}", labels); + continue; + } + LABEL_MAP.put(split[0], split[1]); + } + } + if (bridgeConfig.isMetricsInDelta()) { + LABEL_MAP.put(LABEL_AGGREGATION, AGGREGATION_DELTA); + } + + SdkMeterProviderBuilder providerBuilder = SdkMeterProvider.builder() + .setResource(Resource.empty()); + + if (metricsExporterType == BridgeConfig.MetricsExporterType.OTLP_GRPC) { + String endpoint = bridgeConfig.getMetricsGrpcExporterTarget(); + if (!endpoint.startsWith("http")) { + endpoint = "https://" + endpoint; + } + OtlpGrpcMetricExporterBuilder metricExporterBuilder = OtlpGrpcMetricExporter.builder() + .setEndpoint(endpoint) + .setTimeout(bridgeConfig.getMetricGrpcExporterTimeOutInMills(), TimeUnit.MILLISECONDS) + .setAggregationTemporalitySelector(type -> { + if (bridgeConfig.isMetricsInDelta() && + (type == InstrumentType.COUNTER || type == InstrumentType.OBSERVABLE_COUNTER || type == InstrumentType.HISTOGRAM)) { + return AggregationTemporality.DELTA; + } + return AggregationTemporality.CUMULATIVE; + }); + + String headers = bridgeConfig.getMetricsGrpcExporterHeader(); + if (StringUtils.isNotBlank(headers)) { + Map headerMap = new HashMap<>(); + List kvPairs = Splitter.on(',').omitEmptyStrings().splitToList(headers); + for (String item : kvPairs) { + String[] split = item.split(":"); + if (split.length != 2) { + LOGGER.warn("metricsGrpcExporterHeader is not valid: {}", headers); + continue; + } + headerMap.put(split[0], split[1]); + } + headerMap.forEach(metricExporterBuilder::addHeader); + } + + metricExporter = metricExporterBuilder.build(); + + periodicMetricReader = PeriodicMetricReader.builder(metricExporter) + .setInterval(bridgeConfig.getMetricGrpcExporterIntervalInMills(), TimeUnit.MILLISECONDS) + .build(); + + providerBuilder.registerMetricReader(periodicMetricReader); + } + + if (metricsExporterType == BridgeConfig.MetricsExporterType.PROM) { + String promExporterHost = bridgeConfig.getMetricsPromExporterHost(); + if (StringUtils.isBlank(promExporterHost)) { + promExporterHost = bridgeConfig.getEventBridgeAddress(); + } + prometheusHttpServer = PrometheusHttpServer.builder() + .setHost(promExporterHost) + .setPort(bridgeConfig.getMetricsPromExporterPort()) + .build(); + providerBuilder.registerMetricReader(prometheusHttpServer); + } + + if (metricsExporterType == BridgeConfig.MetricsExporterType.LOG) { + SLF4JBridgeHandler.removeHandlersForRootLogger(); + SLF4JBridgeHandler.install(); + loggingMetricExporter = LoggingMetricExporter.create(bridgeConfig.isMetricsInDelta() ? AggregationTemporality.DELTA : AggregationTemporality.CUMULATIVE); + java.util.logging.Logger.getLogger(LoggingMetricExporter.class.getName()).setLevel(java.util.logging.Level.FINEST); + periodicMetricReader = PeriodicMetricReader.builder(loggingMetricExporter) + .setInterval(bridgeConfig.getMetricLoggingExporterIntervalInMills(), TimeUnit.MILLISECONDS) + .build(); + providerBuilder.registerMetricReader(periodicMetricReader); + } + + registerMetricsView(providerBuilder); + + bridgeMeter = OpenTelemetrySdk.builder() + .setMeterProvider(providerBuilder.build()) + .build() + .getMeter(OPEN_TELEMETRY_METER_NAME); + + initRequestMetrics(); + initRuleMetrics(bridgeMeter); + } + + private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) { + // message size buckets, 1k, 4k, 512k, 1M, 2M, 4M + List messageSizeBuckets = Arrays.asList( + 1d * 1024, //1KB + 4d * 1024, //4KB + 512d * 1024, //512KB + 1d * 1024 * 1024, //1MB + 2d * 1024 * 1024, //2MB + 4d * 1024 * 1024 //4MB + ); + InstrumentSelector messageSizeSelector = InstrumentSelector.builder() + .setType(InstrumentType.HISTOGRAM) + .setName(HISTOGRAM_MESSAGE_SIZE) + .build(); + View messageSizeView = View.builder() + .setAggregation(Aggregation.explicitBucketHistogram(messageSizeBuckets)) + .build(); + providerBuilder.registerView(messageSizeSelector, messageSizeView); + + for (Pair selectorViewPair : getMetricsView()) { + providerBuilder.registerView(selectorViewPair.getObject1(), selectorViewPair.getObject2()); + } + } + + + public void initTriggerMetrics(List eventTargetQueue, String label) { + + targetGauge = bridgeMeter.gaugeBuilder(GAUGE_PROCESSOR_GAUGE) + .setDescription("Request processor gauge") + .ofLongs() + .buildWithCallback(measurement -> { + measurement.record(eventTargetQueue.size(), newAttributesBuilder().put(label, "target").build()); + }); + } + + public void initRuleMetrics(List eventRuleQueue, String label) { + ruleGauge = bridgeMeter.gaugeBuilder(RULE_QUEUE_GAUGE) + .setDescription("Request processor gauge") + .ofLongs() + .buildWithCallback(measurement -> { + measurement.record(eventRuleQueue.size(), newAttributesBuilder().put(label, "rule").build()); + }); + + } + + private void initRequestMetrics() { + messagesInTotal = bridgeMeter.counterBuilder(COUNTER_MESSAGES_IN_TOTAL) + .setDescription("Total number of incoming messages") + .build(); + + messagesOutTotal = bridgeMeter.counterBuilder(COUNTER_MESSAGES_OUT_TOTAL) + .setDescription("Total number of outgoing messages") + .build(); + + throughputInTotal = bridgeMeter.counterBuilder(COUNTER_THROUGHPUT_IN_TOTAL) + .setDescription("Total traffic of incoming messages") + .build(); + + throughputOutTotal = bridgeMeter.counterBuilder(COUNTER_THROUGHPUT_OUT_TOTAL) + .setDescription("Total traffic of outgoing messages") + .build(); + + messageSize = bridgeMeter.histogramBuilder(HISTOGRAM_MESSAGE_SIZE) + .setDescription("Incoming messages size") + .ofLongs() + .build(); + } + + public static void initRuleMetrics(Meter meter) { + invokeLatency = meter.histogramBuilder(HISTOGRAM_RPC_LATENCY) + .setDescription("invoke latency") + .setUnit("milliseconds") + .ofLongs() + .build(); + + } + + public static List> getMetricsView() { + List rpcCostTimeBuckets = Arrays.asList( + (double) Duration.ofMillis(1).toMillis(), + (double) Duration.ofMillis(3).toMillis(), + (double) Duration.ofMillis(5).toMillis(), + (double) Duration.ofMillis(7).toMillis(), + (double) Duration.ofMillis(10).toMillis(), + (double) Duration.ofMillis(100).toMillis(), + (double) Duration.ofSeconds(1).toMillis(), + (double) Duration.ofSeconds(2).toMillis(), + (double) Duration.ofSeconds(3).toMillis() + ); + InstrumentSelector selector = InstrumentSelector.builder() + .setType(InstrumentType.HISTOGRAM) + .setName(HISTOGRAM_RPC_LATENCY) + .build(); + View view = View.builder() + .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets)) + .build(); + return Lists.newArrayList(new Pair<>(selector, view)); + } + + + public void shutdown() { + if (bridgeConfig.getMetricsExporterType() == BridgeConfig.MetricsExporterType.OTLP_GRPC) { + periodicMetricReader.forceFlush(); + periodicMetricReader.shutdown(); + metricExporter.shutdown(); + } + if (bridgeConfig.getMetricsExporterType() == BridgeConfig.MetricsExporterType.PROM) { + prometheusHttpServer.forceFlush(); + prometheusHttpServer.shutdown(); + } + if (bridgeConfig.getMetricsExporterType() == BridgeConfig.MetricsExporterType.LOG) { + periodicMetricReader.forceFlush(); + periodicMetricReader.shutdown(); + loggingMetricExporter.shutdown(); + } + } +} diff --git a/metrics/src/main/java/org/apache/rocketmq/eventbridge/Pair.java b/metrics/src/main/java/org/apache/rocketmq/eventbridge/Pair.java new file mode 100644 index 00000000..25700ba3 --- /dev/null +++ b/metrics/src/main/java/org/apache/rocketmq/eventbridge/Pair.java @@ -0,0 +1,27 @@ +package org.apache.rocketmq.eventbridge; + +public class Pair { + private T1 object1; + private T2 object2; + + public Pair(T1 object1, T2 object2) { + this.object1 = object1; + this.object2 = object2; + } + + public T1 getObject1() { + return object1; + } + + public void setObject1(T1 object1) { + this.object1 = object1; + } + + public T2 getObject2() { + return object2; + } + + public void setObject2(T2 object2) { + this.object2 = object2; + } +} diff --git a/pom.xml b/pom.xml index 6eaac500..2cb64c8c 100644 --- a/pom.xml +++ b/pom.xml @@ -105,6 +105,7 @@ common infrastructure test + metrics From 8e135eac4eeb77d7b1761b79fb52f3a5304902e6 Mon Sep 17 00:00:00 2001 From: chengxy Date: Wed, 5 Jul 2023 11:40:27 +0800 Subject: [PATCH 02/13] enhancement: enhance opentelemery --- .../eventbridge/adapter/runtime/Runtime.java | 4 +++ .../adapter/runtime/boot/EventMonitor.java | 28 +++++++++++++++++++ .../TargetRunnerConfigOnFileObserver.java | 4 +-- .../src/main/resources/runtime.properties | 12 +++++--- .../runtimer/RocketMQEventSubscriber.java | 6 ++-- .../eventbridge/BridgeMetricsManager.java | 18 +----------- .../src/main/resources/application.properties | 6 ++-- 7 files changed, 50 insertions(+), 28 deletions(-) create mode 100644 adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventMonitor.java diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java index 1a90ca34..93a1a27f 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java @@ -19,6 +19,7 @@ import org.apache.rocketmq.eventbridge.BridgeMetricsManager; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.EventBusListener; +import org.apache.rocketmq.eventbridge.adapter.runtime.boot.EventMonitor; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.EventRuleTransfer; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.EventTargetTrigger; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorContext; @@ -70,10 +71,13 @@ public void initAndStart() throws Exception { circulatorContext.initCirculatorContext(runnerConfigObserver.getTargetRunnerConfig()); runnerConfigObserver.registerListener(circulatorContext); runnerConfigObserver.registerListener(eventSubscriber); + + EventMonitor eventMonitor = new EventMonitor(eventSubscriber); BridgeMetricsManager metricsManager = eventSubscriber.getMetricsManager(); EventBusListener eventBusListener = new EventBusListener(circulatorContext, eventSubscriber, errorHandler, metricsManager); EventRuleTransfer eventRuleTransfer = new EventRuleTransfer(circulatorContext, offsetManager, errorHandler, metricsManager); EventTargetTrigger eventTargetPusher = new EventTargetTrigger(circulatorContext, offsetManager, errorHandler, metricsManager); + RUNTIME_START_AND_SHUTDOWN.appendStartAndShutdown(eventMonitor); RUNTIME_START_AND_SHUTDOWN.appendStartAndShutdown(eventBusListener); RUNTIME_START_AND_SHUTDOWN.appendStartAndShutdown(eventRuleTransfer); RUNTIME_START_AND_SHUTDOWN.appendStartAndShutdown(eventTargetPusher); diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventMonitor.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventMonitor.java new file mode 100644 index 00000000..4fac2d3e --- /dev/null +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventMonitor.java @@ -0,0 +1,28 @@ +package org.apache.rocketmq.eventbridge.adapter.runtime.boot; + +import org.apache.rocketmq.eventbridge.BridgeMetricsManager; +import org.apache.rocketmq.eventbridge.adapter.runtime.boot.listener.EventSubscriber; +import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread; + +public class EventMonitor extends ServiceThread { + + private BridgeMetricsManager bridgeMetricsManager; + + public EventMonitor(EventSubscriber eventSubscriber) { + this.bridgeMetricsManager = eventSubscriber.getMetricsManager(); + } + @Override + public String getServiceName() { + return EventMonitor.class.getSimpleName(); + } + + @Override + public void run() { + bridgeMetricsManager.init(); + } + + @Override + public void shutdown() { + bridgeMetricsManager.shutdown(); + } +} \ No newline at end of file diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java index cbd01e07..2d70723e 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java @@ -82,7 +82,7 @@ public Set getLatestTargetRunnerConfig() { public void addListen(String pathName, TargetRunnerConfigOnFileObserver pusherConfigOnFileService) { log.info("Watching task file changing:{}", pathName); - int index = pathName.lastIndexOf("/"); + int index = pathName.lastIndexOf("\\"); String filePath = pathName.substring(0, index); String fileName = pathName.substring(index + 1); service.scheduleAtFixedRate(() -> { @@ -109,7 +109,7 @@ public void addListen(String pathName, TargetRunnerConfigOnFileObserver pusherCo private String getConfigFilePath() { - return this.getClass().getClassLoader().getResource(DEFAULT_TARGET_RUNNER_CONFIG_FILE_NAME).getPath(); + return "F:\\gitrepo\\rocketmq-eventbridge\\adapter\\runtime\\src\\main\\resources\\target-runner.json"; } } \ No newline at end of file diff --git a/adapter/runtime/src/main/resources/runtime.properties b/adapter/runtime/src/main/resources/runtime.properties index b99c83d9..bc9a20a3 100644 --- a/adapter/runtime/src/main/resources/runtime.properties +++ b/adapter/runtime/src/main/resources/runtime.properties @@ -18,10 +18,14 @@ rocketmq.namesrvAddr=localhost:9876 rocketmq.consumer.pullTimeOut = 3000 rocketmq.consumer.pullBatchSize=20 rocketmq.cluster.name=DefaultCluster +rocketmq.consumerGroup=default ## runtime -rumtimer.name=eventbridge-runtimer -runtimer.pluginpath=/Users/Local/eventbridge/plugin -runtimer.storePathRootDir=/Users/Local/eventbridge/store +rumtime.name=eventbridge-runtimer ## listener listener.eventQueue.threshold=50000 -listener.targetQueue.threshold=50000 \ No newline at end of file +listener.targetQueue.threshold=50000 + +## monitor +metrics.endpoint.host=127.0.0.1 +metrics.endpoint.port=19090 +metrics.collector.mode=2 \ No newline at end of file diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java index 69f2e667..e6c270be 100644 --- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java +++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java @@ -196,14 +196,16 @@ private void initMqProperties() { String socks5Password = properties.getProperty("rocketmq.consumer.socks5Password"); String socks5Endpoint = properties.getProperty("rocketmq.consumer.socks5Endpoint"); - String metricsAddress = properties.getProperty("metrics.endpoint.address"); + String metricsPromExporterHost = properties.getProperty("metrics.endpoint.host"); + String metricsPromExporterPort = properties.getProperty("metrics.endpoint.port"); String metricsCollectorMode = properties.getProperty("metrics.collector.mode"); clientConfig.setNameSrvAddr(namesrvAddr); clientConfig.setAccessChannel(AccessChannel.CLOUD.name().equals(accessChannel) ? AccessChannel.CLOUD : AccessChannel.LOCAL); clientConfig.setNamespace(namespace); BridgeConfig bridgeConfig = new BridgeConfig(); - bridgeConfig.setEventBridgeAddress(metricsAddress); + bridgeConfig.setMetricsPromExporterHost(metricsPromExporterHost); + bridgeConfig.setMetricsPromExporterPort(Integer.parseInt(metricsPromExporterPort)); bridgeConfig.setMetricsExporterType(Integer.parseInt(metricsCollectorMode)); this.clientConfig = clientConfig; this.bridgeConfig = bridgeConfig; diff --git a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java index ff1bc684..e095cd92 100644 --- a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java +++ b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java @@ -110,22 +110,6 @@ public void init() { return; } - String labels = bridgeConfig.getMetricsLabel(); - if (StringUtils.isNotBlank(labels)) { - List kvPairs = Splitter.on(',').omitEmptyStrings().splitToList(labels); - for (String item : kvPairs) { - String[] split = item.split(":"); - if (split.length != 2) { - LOGGER.warn("metricsLabel is not valid: {}", labels); - continue; - } - LABEL_MAP.put(split[0], split[1]); - } - } - if (bridgeConfig.isMetricsInDelta()) { - LABEL_MAP.put(LABEL_AGGREGATION, AGGREGATION_DELTA); - } - SdkMeterProviderBuilder providerBuilder = SdkMeterProvider.builder() .setResource(Resource.empty()); @@ -196,7 +180,7 @@ public void init() { bridgeMeter = OpenTelemetrySdk.builder() .setMeterProvider(providerBuilder.build()) - .build() + .buildAndRegisterGlobal() .getMeter(OPEN_TELEMETRY_METER_NAME); initRequestMetrics(); diff --git a/start/src/main/resources/application.properties b/start/src/main/resources/application.properties index 5911b302..7170b199 100644 --- a/start/src/main/resources/application.properties +++ b/start/src/main/resources/application.properties @@ -29,11 +29,11 @@ rocketmq.namesrvAddr=localhost:9876 rocketmq.cluster.name=DefaultCluster ## runtime -runtime.config.mode=DB +runtime.config.mode=FILE runtime.storage.mode=ROCKETMQ rumtime.name=eventbridge-runtimer -runtime.pluginpath=~/eventbridge/plugin - +runtime.pluginpath=E:\\rocketmq_files\\plugins +runtime.storePathRootDir=E:\\rocketmq_files\\storeRoot ## log app.name=rocketmqeventbridge From c1da38ce37c0e59c739f5c4b86c400fac3b3a4f6 Mon Sep 17 00:00:00 2001 From: chengxy Date: Wed, 5 Jul 2023 11:49:42 +0800 Subject: [PATCH 03/13] recover --- .../runtime/service/TargetRunnerConfigOnFileObserver.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java index 2d70723e..cbd01e07 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnFileObserver.java @@ -82,7 +82,7 @@ public Set getLatestTargetRunnerConfig() { public void addListen(String pathName, TargetRunnerConfigOnFileObserver pusherConfigOnFileService) { log.info("Watching task file changing:{}", pathName); - int index = pathName.lastIndexOf("\\"); + int index = pathName.lastIndexOf("/"); String filePath = pathName.substring(0, index); String fileName = pathName.substring(index + 1); service.scheduleAtFixedRate(() -> { @@ -109,7 +109,7 @@ public void addListen(String pathName, TargetRunnerConfigOnFileObserver pusherCo private String getConfigFilePath() { - return "F:\\gitrepo\\rocketmq-eventbridge\\adapter\\runtime\\src\\main\\resources\\target-runner.json"; + return this.getClass().getClassLoader().getResource(DEFAULT_TARGET_RUNNER_CONFIG_FILE_NAME).getPath(); } } \ No newline at end of file From a6fb646973a188bf1e93df24ee69504ad121e5f2 Mon Sep 17 00:00:00 2001 From: chengxy Date: Wed, 5 Jul 2023 11:52:20 +0800 Subject: [PATCH 04/13] add license && remove useless code --- .../eventbridge/metrics/NopLongCounter.java | 17 +++++++++++++++ .../eventbridge/metrics/NopLongHistogram.java | 17 +++++++++++++++ .../metrics/NopLongUpDownCounter.java | 17 +++++++++++++++ .../metrics/NopObservableLongGauge.java | 17 +++++++++++++++ .../rocketmq/eventbridge/BridgeConfig.java | 17 +++++++++++++++ .../eventbridge/BridgeMetricsConstant.java | 21 +++++++++++++++---- .../eventbridge/BridgeMetricsManager.java | 17 +++++++++++++++ .../org/apache/rocketmq/eventbridge/Pair.java | 17 +++++++++++++++ 8 files changed, 136 insertions(+), 4 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongCounter.java b/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongCounter.java index fa48bc19..fe84f98a 100644 --- a/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongCounter.java +++ b/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongCounter.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.eventbridge.metrics; import io.opentelemetry.api.common.Attributes; diff --git a/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongHistogram.java b/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongHistogram.java index f55986c7..a6d4029b 100644 --- a/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongHistogram.java +++ b/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongHistogram.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.eventbridge.metrics; import io.opentelemetry.api.common.Attributes; diff --git a/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongUpDownCounter.java b/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongUpDownCounter.java index f5ef4296..84427c6c 100644 --- a/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongUpDownCounter.java +++ b/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongUpDownCounter.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.eventbridge.metrics; import io.opentelemetry.api.common.Attributes; diff --git a/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopObservableLongGauge.java b/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopObservableLongGauge.java index 17cbbc3a..37a801a5 100644 --- a/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopObservableLongGauge.java +++ b/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopObservableLongGauge.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.eventbridge.metrics; import io.opentelemetry.api.metrics.ObservableLongGauge; diff --git a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeConfig.java b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeConfig.java index 27c3ae9a..4bc1d4d7 100644 --- a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeConfig.java +++ b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeConfig.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.eventbridge; public class BridgeConfig { diff --git a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsConstant.java b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsConstant.java index 90a38385..1a3c1c27 100644 --- a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsConstant.java +++ b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsConstant.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.eventbridge; public class BridgeMetricsConstant { @@ -15,8 +32,4 @@ public class BridgeMetricsConstant { /** eventbridge process message latency**/ public static final String HISTOGRAM_RPC_LATENCY = "process_latency"; - - public static final String LABEL_AGGREGATION = "aggregation"; - public static final String AGGREGATION_DELTA = "delta"; - public static final String LABEL_PROCESSOR = "processor"; } diff --git a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java index e095cd92..2182c845 100644 --- a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java +++ b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.eventbridge; import com.google.common.base.Splitter; diff --git a/metrics/src/main/java/org/apache/rocketmq/eventbridge/Pair.java b/metrics/src/main/java/org/apache/rocketmq/eventbridge/Pair.java index 25700ba3..4afe22e4 100644 --- a/metrics/src/main/java/org/apache/rocketmq/eventbridge/Pair.java +++ b/metrics/src/main/java/org/apache/rocketmq/eventbridge/Pair.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.eventbridge; public class Pair { From 63ca0cb5cdb29f992f58825cbc76c3e92119be5b Mon Sep 17 00:00:00 2001 From: chengxy Date: Wed, 5 Jul 2023 11:53:06 +0800 Subject: [PATCH 05/13] add license --- .../adapter/runtime/boot/EventMonitor.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventMonitor.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventMonitor.java index 4fac2d3e..495d4225 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventMonitor.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventMonitor.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.eventbridge.adapter.runtime.boot; import org.apache.rocketmq.eventbridge.BridgeMetricsManager; From 5b319660ebe851e603e138837fbfafc94b099e79 Mon Sep 17 00:00:00 2001 From: chengxy Date: Wed, 5 Jul 2023 11:56:04 +0800 Subject: [PATCH 06/13] add license --- metrics/pom.xml | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/metrics/pom.xml b/metrics/pom.xml index 7eb23ce4..4d2294ff 100644 --- a/metrics/pom.xml +++ b/metrics/pom.xml @@ -1,4 +1,13 @@ - + From e641e279f8092313d444184e39bd2105e3a396ef Mon Sep 17 00:00:00 2001 From: chengxy Date: Mon, 17 Jul 2023 16:47:11 +0800 Subject: [PATCH 07/13] refactor metrics collector --- .../eventbridge/adapter/runtime/Runtime.java | 4 +-- .../adapter/runtime/boot/EventMonitor.java | 4 +-- .../boot/listener/EventSubscriber.java | 7 ----- .../src/main/resources/runtime.properties | 7 +---- .../runtimer/RocketMQEventSubscriber.java | 15 ----------- metrics/pom.xml | 4 +++ .../eventbridge/BridgeMetricsManager.java | 26 ++++++++++++++++--- metrics/src/main/resources/metrics.properties | 4 +++ 8 files changed, 36 insertions(+), 35 deletions(-) create mode 100644 metrics/src/main/resources/metrics.properties diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java index 93a1a27f..d831cfc4 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java @@ -72,8 +72,8 @@ public void initAndStart() throws Exception { runnerConfigObserver.registerListener(circulatorContext); runnerConfigObserver.registerListener(eventSubscriber); - EventMonitor eventMonitor = new EventMonitor(eventSubscriber); - BridgeMetricsManager metricsManager = eventSubscriber.getMetricsManager(); + BridgeMetricsManager metricsManager = new BridgeMetricsManager(); + EventMonitor eventMonitor = new EventMonitor(metricsManager); EventBusListener eventBusListener = new EventBusListener(circulatorContext, eventSubscriber, errorHandler, metricsManager); EventRuleTransfer eventRuleTransfer = new EventRuleTransfer(circulatorContext, offsetManager, errorHandler, metricsManager); EventTargetTrigger eventTargetPusher = new EventTargetTrigger(circulatorContext, offsetManager, errorHandler, metricsManager); diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventMonitor.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventMonitor.java index 495d4225..b60d12c1 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventMonitor.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventMonitor.java @@ -25,8 +25,8 @@ public class EventMonitor extends ServiceThread { private BridgeMetricsManager bridgeMetricsManager; - public EventMonitor(EventSubscriber eventSubscriber) { - this.bridgeMetricsManager = eventSubscriber.getMetricsManager(); + public EventMonitor(BridgeMetricsManager metricsManager) { + this.bridgeMetricsManager = metricsManager; } @Override public String getServiceName() { diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java index 772d9ef2..8fa87fc9 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java @@ -35,13 +35,6 @@ public abstract class EventSubscriber implements TargetRunnerListener { */ public abstract void refresh(SubscribeRunnerKeys subscribeRunnerKeys, RefreshTypeEnum refreshTypeEnum); - - /** - * fetch metrics configuration - * @return - */ - public abstract BridgeMetricsManager getMetricsManager(); - /** * Pull connect records from store, Blocking method when is empty. * diff --git a/adapter/runtime/src/main/resources/runtime.properties b/adapter/runtime/src/main/resources/runtime.properties index bc9a20a3..39d00e45 100644 --- a/adapter/runtime/src/main/resources/runtime.properties +++ b/adapter/runtime/src/main/resources/runtime.properties @@ -23,9 +23,4 @@ rocketmq.consumerGroup=default rumtime.name=eventbridge-runtimer ## listener listener.eventQueue.threshold=50000 -listener.targetQueue.threshold=50000 - -## monitor -metrics.endpoint.host=127.0.0.1 -metrics.endpoint.port=19090 -metrics.collector.mode=2 \ No newline at end of file +listener.targetQueue.threshold=50000 \ No newline at end of file diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java index e6c270be..b0dfa86c 100644 --- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java +++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java @@ -122,13 +122,6 @@ public void refresh(SubscribeRunnerKeys subscribeRunnerKeys, RefreshTypeEnum ref } } - @Override - public BridgeMetricsManager getMetricsManager() { - BridgeMetricsManager metricsManager = new BridgeMetricsManager(bridgeConfig); - return metricsManager; - } - - @Override public List pull() { ArrayList messages = new ArrayList<>(); @@ -196,19 +189,11 @@ private void initMqProperties() { String socks5Password = properties.getProperty("rocketmq.consumer.socks5Password"); String socks5Endpoint = properties.getProperty("rocketmq.consumer.socks5Endpoint"); - String metricsPromExporterHost = properties.getProperty("metrics.endpoint.host"); - String metricsPromExporterPort = properties.getProperty("metrics.endpoint.port"); - String metricsCollectorMode = properties.getProperty("metrics.collector.mode"); clientConfig.setNameSrvAddr(namesrvAddr); clientConfig.setAccessChannel(AccessChannel.CLOUD.name().equals(accessChannel) ? AccessChannel.CLOUD : AccessChannel.LOCAL); clientConfig.setNamespace(namespace); - BridgeConfig bridgeConfig = new BridgeConfig(); - bridgeConfig.setMetricsPromExporterHost(metricsPromExporterHost); - bridgeConfig.setMetricsPromExporterPort(Integer.parseInt(metricsPromExporterPort)); - bridgeConfig.setMetricsExporterType(Integer.parseInt(metricsCollectorMode)); this.clientConfig = clientConfig; - this.bridgeConfig = bridgeConfig; if (StringUtils.isNotBlank(accessKey) && StringUtils.isNotBlank(secretKey)) { this.sessionCredentials = new SessionCredentials(accessKey, secretKey); diff --git a/metrics/pom.xml b/metrics/pom.xml index 4d2294ff..b6f696ab 100644 --- a/metrics/pom.xml +++ b/metrics/pom.xml @@ -38,6 +38,10 @@ 1.0.0 provided + + org.springframework + spring-context + io.opentelemetry opentelemetry-exporter-otlp diff --git a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java index 2182c845..216a23d5 100644 --- a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java +++ b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java @@ -40,11 +40,13 @@ import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; import io.opentelemetry.sdk.resources.Resource; +import java.io.IOException; import java.time.Duration; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; @@ -54,6 +56,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.bridge.SLF4JBridgeHandler; +import org.springframework.core.io.support.PropertiesLoaderUtils; import static org.apache.rocketmq.eventbridge.BridgeMetricsConstant.*; @@ -62,7 +65,7 @@ public class BridgeMetricsManager { private static final Logger LOGGER = LoggerFactory.getLogger(BridgeMetricsManager.class); - private final BridgeConfig bridgeConfig; + private BridgeConfig bridgeConfig; private final static Map LABEL_MAP = new HashMap<>(); private OtlpGrpcMetricExporter metricExporter; private PeriodicMetricReader periodicMetricReader; @@ -85,8 +88,25 @@ public class BridgeMetricsManager { public static LongCounter throughputOutTotal = new NopLongCounter(); public static LongHistogram messageSize = new NopLongHistogram(); - public BridgeMetricsManager(BridgeConfig bridgeConfig) { - this.bridgeConfig = bridgeConfig; + public BridgeMetricsManager() { + initMetricsProperties(); + } + + private void initMetricsProperties() { + try { + Properties properties = PropertiesLoaderUtils.loadAllProperties("metrics.properties"); + String metricsPromExporterHost = properties.getProperty("metrics.endpoint.host"); + String metricsPromExporterPort = properties.getProperty("metrics.endpoint.port"); + String metricsCollectorMode = properties.getProperty("metrics.collector.mode"); + + BridgeConfig bridgeConfig = new BridgeConfig(); + bridgeConfig.setMetricsPromExporterHost(metricsPromExporterHost); + bridgeConfig.setMetricsPromExporterPort(Integer.parseInt(metricsPromExporterPort)); + bridgeConfig.setMetricsExporterType(Integer.parseInt(metricsCollectorMode)); + this.bridgeConfig = bridgeConfig; + } catch (IOException e) { + LOGGER.error("init metrics properties exception, stack trace- ", e); + } } diff --git a/metrics/src/main/resources/metrics.properties b/metrics/src/main/resources/metrics.properties new file mode 100644 index 00000000..cea5c999 --- /dev/null +++ b/metrics/src/main/resources/metrics.properties @@ -0,0 +1,4 @@ +## metrics +metrics.endpoint.host=127.0.0.1 +metrics.endpoint.port=19090 +metrics.collector.mode=2 \ No newline at end of file From 9d655a0de72e8dd7943b8c92841945aa72f39a4e Mon Sep 17 00:00:00 2001 From: chengxy Date: Tue, 18 Jul 2023 11:39:57 +0800 Subject: [PATCH 08/13] add test metrics --- .../runtime/boot/EventBusListener.java | 2 +- .../runtime/boot/EventRuleTransfer.java | 6 ++ .../eventbridge/metrics/NopLongCounter.java | 16 +---- .../eventbridge/BridgeMetricsConstant.java | 25 ++++--- .../eventbridge/BridgeMetricsManager.java | 66 ++++++++----------- 5 files changed, 53 insertions(+), 62 deletions(-) diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java index 3b2fb45c..ee86b70c 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java @@ -61,7 +61,7 @@ public void run() { List pullRecordList = Lists.newArrayList(); try { pullRecordList = Optional.ofNullable(eventSubscriber.pull()).orElse(new ArrayList<>()); - BridgeMetricsManager.messagesInTotal.add(pullRecordList.size()); + metricsManager.eventbusInEventsTotal(pullRecordList.size()); if (CollectionUtils.isEmpty(pullRecordList)) { this.waitForRunning(1000); continue; diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java index 2621c14d..2939dc66 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java @@ -73,6 +73,7 @@ public void run() { List afterTransformConnect= Lists.newArrayList(); while (!stopped) { try { + long startTime = System.currentTimeMillis(); Map> eventRecordMap = circulatorContext.takeEventRecords(batchSize); if (MapUtils.isEmpty(eventRecordMap)) { logger.trace("listen eventRecords is empty, continue by curTime - {}", System.currentTimeMillis()); @@ -108,10 +109,15 @@ public void run() { completableFutures.add(transformFuture); }); } + long endTime = System.currentTimeMillis(); + long latency = endTime - startTime; CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[eventRecordMap.values().size()])).get(); circulatorContext.offerTargetTaskQueue(afterTransformConnect); + //success + metricsManager.eventRuleLatencySeconds(latency); logger.info("offer target task queues succeed, transforms - {}", JSON.toJSONString(afterTransformConnect)); } catch (Exception exception) { + //failed logger.error("transfer event record failed, stackTrace-", exception); afterTransformConnect.forEach(transferRecord -> errorHandler.handle(transferRecord, exception)); } diff --git a/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongCounter.java b/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongCounter.java index fe84f98a..536610bc 100644 --- a/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongCounter.java +++ b/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongCounter.java @@ -17,20 +17,8 @@ package org.apache.rocketmq.eventbridge.metrics; -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.metrics.LongCounter; -import io.opentelemetry.context.Context; +import io.opentelemetry.api.metrics.ObservableLongCounter; -public class NopLongCounter implements LongCounter { - @Override public void add(long l) { +public class NopLongCounter implements ObservableLongCounter { - } - - @Override public void add(long l, Attributes attributes) { - - } - - @Override public void add(long l, Attributes attributes, Context context) { - - } } diff --git a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsConstant.java b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsConstant.java index 1a3c1c27..86536968 100644 --- a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsConstant.java +++ b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsConstant.java @@ -17,19 +17,28 @@ package org.apache.rocketmq.eventbridge; +import java.util.HashMap; +import java.util.Map; + public class BridgeMetricsConstant { public static final String OPEN_TELEMETRY_METER_NAME = "bridge-meter"; - - public static final String GAUGE_PROCESSOR_GAUGE = "target_queue_gauge"; - public static final String RULE_QUEUE_GAUGE = "rule_queue_gauge"; - - public static final String COUNTER_MESSAGES_IN_TOTAL = "eventbridge_messages_in_total"; - public static final String COUNTER_MESSAGES_OUT_TOTAL = "eventbridge_messages_out_total"; - public static final String COUNTER_THROUGHPUT_IN_TOTAL = "eventbridge_throughput_in_total"; - public static final String COUNTER_THROUGHPUT_OUT_TOTAL = "eventbridge_throughput_out_total"; public static final String HISTOGRAM_MESSAGE_SIZE = "eventbridge_message_size"; + public static final String EVENTBUS_IN_EVENTS_TOTAL = "eventbridge_eventbus_in_events_total"; + public static final String EVENTRULE_LATENCY_SECONDS = "eventbridge_eventrule_latency_seconds"; /** eventbridge process message latency**/ public static final String HISTOGRAM_RPC_LATENCY = "process_latency"; + + public static final Map ACCOUNT_LABELS = new HashMap() { + { put("account_id", "account id"); } + { put("runnerName", "runnerName"); } + { put("status", "status"); } + }; + + public static final Map RUNNER_NAME_LABELS = new HashMap() { + { put("account_id", "account id"); } + { put("runnerName", "runner name"); } + }; + } diff --git a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java index 216a23d5..dc81663c 100644 --- a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java +++ b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java @@ -24,6 +24,7 @@ import io.opentelemetry.api.metrics.LongCounter; import io.opentelemetry.api.metrics.LongHistogram; import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableLongCounter; import io.opentelemetry.api.metrics.ObservableLongGauge; import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter; import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder; @@ -66,7 +67,6 @@ public class BridgeMetricsManager { private static final Logger LOGGER = LoggerFactory.getLogger(BridgeMetricsManager.class); private BridgeConfig bridgeConfig; - private final static Map LABEL_MAP = new HashMap<>(); private OtlpGrpcMetricExporter metricExporter; private PeriodicMetricReader periodicMetricReader; private PrometheusHttpServer prometheusHttpServer; @@ -75,17 +75,10 @@ public class BridgeMetricsManager { // queue stats metrics public static ObservableLongGauge targetGauge = new NopObservableLongGauge(); - public static ObservableLongGauge ruleGauge = new NopObservableLongGauge(); - - //invoke timeout public static LongHistogram invokeLatency = new NopLongHistogram(); - // request metrics - public static LongCounter messagesInTotal = new NopLongCounter(); - public static LongCounter messagesOutTotal = new NopLongCounter(); - public static LongCounter throughputInTotal = new NopLongCounter(); - public static LongCounter throughputOutTotal = new NopLongCounter(); + public static ObservableLongCounter messagesOutTotal = new NopLongCounter(); public static LongHistogram messageSize = new NopLongHistogram(); public BridgeMetricsManager() { @@ -109,10 +102,9 @@ private void initMetricsProperties() { } } - - public static AttributesBuilder newAttributesBuilder() { + public static AttributesBuilder newAttributesBuilder(Map labels) { AttributesBuilder attributesBuilder = Attributes.builder(); - LABEL_MAP.forEach(attributesBuilder::put); + labels.forEach(attributesBuilder::put); return attributesBuilder; } @@ -249,50 +241,46 @@ private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) { } - public void initTriggerMetrics(List eventTargetQueue, String label) { + public AttributesBuilder addGroup(Map labels ) { + return newAttributesBuilder(labels); + } - targetGauge = bridgeMeter.gaugeBuilder(GAUGE_PROCESSOR_GAUGE) - .setDescription("Request processor gauge") - .ofLongs() + private void countMetrics(String metricsName, long count, AttributesBuilder attributesBuilder) { + messagesOutTotal = bridgeMeter.counterBuilder(metricsName) + .setDescription("Total number of outgoing messages") .buildWithCallback(measurement -> { - measurement.record(eventTargetQueue.size(), newAttributesBuilder().put(label, "target").build()); + measurement.record(count, attributesBuilder.build()); }); + } - public void initRuleMetrics(List eventRuleQueue, String label) { - ruleGauge = bridgeMeter.gaugeBuilder(RULE_QUEUE_GAUGE) - .setDescription("Request processor gauge") + private void gaugeMetrics(String metricsName, long count, AttributesBuilder attributesBuilder) { + targetGauge = bridgeMeter.gaugeBuilder(metricsName) + .setDescription("Gauge of total messages ") .ofLongs() - .buildWithCallback(measurement -> { - measurement.record(eventRuleQueue.size(), newAttributesBuilder().put(label, "rule").build()); + .buildWithCallback( measurement -> { + measurement.record(count, attributesBuilder.build()); }); - } - private void initRequestMetrics() { - messagesInTotal = bridgeMeter.counterBuilder(COUNTER_MESSAGES_IN_TOTAL) - .setDescription("Total number of incoming messages") - .build(); - - messagesOutTotal = bridgeMeter.counterBuilder(COUNTER_MESSAGES_OUT_TOTAL) - .setDescription("Total number of outgoing messages") - .build(); - - throughputInTotal = bridgeMeter.counterBuilder(COUNTER_THROUGHPUT_IN_TOTAL) - .setDescription("Total traffic of incoming messages") - .build(); + public void eventbusInEventsTotal(long count) { + AttributesBuilder attributesBuilder = addGroup(ACCOUNT_LABELS); + countMetrics(EVENTBUS_IN_EVENTS_TOTAL, count, attributesBuilder); + } - throughputOutTotal = bridgeMeter.counterBuilder(COUNTER_THROUGHPUT_OUT_TOTAL) - .setDescription("Total traffic of outgoing messages") - .build(); + public void eventRuleLatencySeconds(long latency) { + AttributesBuilder attributesBuilder = addGroup(RUNNER_NAME_LABELS); + gaugeMetrics(EVENTRULE_LATENCY_SECONDS, latency, attributesBuilder); + } + public void initRequestMetrics() { messageSize = bridgeMeter.histogramBuilder(HISTOGRAM_MESSAGE_SIZE) .setDescription("Incoming messages size") .ofLongs() .build(); } - public static void initRuleMetrics(Meter meter) { + public void initRuleMetrics(Meter meter) { invokeLatency = meter.histogramBuilder(HISTOGRAM_RPC_LATENCY) .setDescription("invoke latency") .setUnit("milliseconds") From 8af194e3e6e87242773e78baef16ff29df22fa92 Mon Sep 17 00:00:00 2001 From: chengxy Date: Tue, 18 Jul 2023 15:17:01 +0800 Subject: [PATCH 09/13] refactor metrics --- .../runtime/boot/EventBusListener.java | 22 ++++++++++++++-- .../runtime/boot/EventRuleTransfer.java | 13 +++++++--- .../eventbridge/BridgeMetricsManager.java | 26 ++++++++++++++++--- 3 files changed, 51 insertions(+), 10 deletions(-) diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java index ee86b70c..d7891dad 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java @@ -29,6 +29,8 @@ import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorContext; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.listener.EventSubscriber; import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread; +import org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.TargetRunnerConfig; +import org.apache.rocketmq.eventbridge.adapter.runtime.config.RuntimeConfigDefine; import org.apache.rocketmq.eventbridge.adapter.runtime.error.ErrorHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +63,12 @@ public void run() { List pullRecordList = Lists.newArrayList(); try { pullRecordList = Optional.ofNullable(eventSubscriber.pull()).orElse(new ArrayList<>()); - metricsManager.eventbusInEventsTotal(pullRecordList.size()); + + for (ConnectRecord connectRecord : pullRecordList) { + String runnerName = connectRecord.getExtension(RuntimeConfigDefine.RUNNER_NAME); + String accountId = getAccountId(connectRecord); + metricsManager.eventbusInEventsTotal(runnerName, accountId, "success", 1); + } if (CollectionUtils.isEmpty(pullRecordList)) { this.waitForRunning(1000); continue; @@ -69,11 +76,22 @@ public void run() { circulatorContext.offerEventRecords(pullRecordList); } catch (Exception exception) { logger.error(getServiceName() + " - event bus pull record exception, stackTrace - ", exception); - pullRecordList.forEach(pullRecord -> errorHandler.handle(pullRecord, exception)); + pullRecordList.forEach(pullRecord -> { + String runnerName = pullRecord.getExtension(RuntimeConfigDefine.RUNNER_NAME); + String accountId = getAccountId(pullRecord); + metricsManager.eventbusInEventsTotal(runnerName, accountId, "failed", 1); + errorHandler.handle(pullRecord, exception); + }); } } } + public String getAccountId(ConnectRecord connectRecord) { + String runnerName = connectRecord.getExtension(RuntimeConfigDefine.RUNNER_NAME); + TargetRunnerConfig runnerConfig = circulatorContext.getRunnerConfig(runnerName); + return runnerConfig.getAccountId(); + } + @Override public String getServiceName() { return EventBusListener.class.getSimpleName(); diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java index 2939dc66..dda1f954 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java @@ -24,6 +24,8 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; + import javax.annotation.PostConstruct; import org.apache.commons.collections.MapUtils; import org.apache.rocketmq.eventbridge.BridgeMetricsManager; @@ -31,6 +33,7 @@ import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.OffsetManager; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.transfer.TransformEngine; import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread; +import org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.TargetRunnerConfig; import org.apache.rocketmq.eventbridge.adapter.runtime.error.ErrorHandler; import org.apache.rocketmq.eventbridge.adapter.runtime.utils.ExceptionUtil; import org.slf4j.Logger; @@ -73,7 +76,6 @@ public void run() { List afterTransformConnect= Lists.newArrayList(); while (!stopped) { try { - long startTime = System.currentTimeMillis(); Map> eventRecordMap = circulatorContext.takeEventRecords(batchSize); if (MapUtils.isEmpty(eventRecordMap)) { logger.trace("listen eventRecords is empty, continue by curTime - {}", System.currentTimeMillis()); @@ -90,18 +92,24 @@ public void run() { afterTransformConnect.clear(); List> completableFutures = Lists.newArrayList(); for (String runnerName : eventRecordMap.keySet()) { + TargetRunnerConfig runnerConfig = circulatorContext.getRunnerConfig(runnerName); TransformEngine curTransformEngine = latestTransformMap.get(runnerName); List curEventRecords = eventRecordMap.get(runnerName); curEventRecords.forEach(pullRecord -> { + long startTime = System.currentTimeMillis(); CompletableFuture transformFuture = CompletableFuture.supplyAsync(() -> curTransformEngine.doTransforms(pullRecord)) .exceptionally((exception) -> { logger.error("transfer do transform event record failed,stackTrace-", exception); + //failed + metricsManager.eventRuleLatencySeconds(runnerName, runnerConfig.getAccountId(), "failed", System.currentTimeMillis() - startTime); errorHandler.handle(pullRecord, exception); return null; }) .thenAccept(pushRecord -> { if (Objects.nonNull(pushRecord)) { afterTransformConnect.add(pushRecord); + // success + metricsManager.eventRuleLatencySeconds(runnerName, runnerConfig.getAccountId(), "success",System.currentTimeMillis() - startTime); } else { offsetManager.commit(pullRecord); } @@ -109,12 +117,9 @@ public void run() { completableFutures.add(transformFuture); }); } - long endTime = System.currentTimeMillis(); - long latency = endTime - startTime; CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[eventRecordMap.values().size()])).get(); circulatorContext.offerTargetTaskQueue(afterTransformConnect); //success - metricsManager.eventRuleLatencySeconds(latency); logger.info("offer target task queues succeed, transforms - {}", JSON.toJSONString(afterTransformConnect)); } catch (Exception exception) { //failed diff --git a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java index dc81663c..2474f939 100644 --- a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java +++ b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java @@ -263,13 +263,15 @@ private void gaugeMetrics(String metricsName, long count, AttributesBuilder attr }); } - public void eventbusInEventsTotal(long count) { - AttributesBuilder attributesBuilder = addGroup(ACCOUNT_LABELS); + public void eventbusInEventsTotal(String runnerName, String accountId, String status, long count) { + Map labelMaps = buildLabelMap(runnerName, accountId, status); + AttributesBuilder attributesBuilder = addGroup(labelMaps); countMetrics(EVENTBUS_IN_EVENTS_TOTAL, count, attributesBuilder); } - public void eventRuleLatencySeconds(long latency) { - AttributesBuilder attributesBuilder = addGroup(RUNNER_NAME_LABELS); + public void eventRuleLatencySeconds(String runnerName, String accountId ,String status, long latency) { + Map labelMaps = buildLabelMap(runnerName, accountId, status); + AttributesBuilder attributesBuilder = addGroup(labelMaps); gaugeMetrics(EVENTRULE_LATENCY_SECONDS, latency, attributesBuilder); } @@ -328,4 +330,20 @@ public void shutdown() { loggingMetricExporter.shutdown(); } } + + private Map buildLabelMap(String runnerName, String accountId, String status) { + Map labelMap = new HashMap<>(); + if (StringUtils.isNotBlank(runnerName)) { + labelMap.put("runner_name", runnerName); + } + + if (StringUtils.isNotBlank(accountId)) { + + labelMap.put("account_id", accountId); + } + if (StringUtils.isNotBlank(status)) { + labelMap.put("status", status); + } + return labelMap; + } } From b64bfc7e7a17f4f309d28923e1e84f3662895c25 Mon Sep 17 00:00:00 2001 From: chengxy Date: Thu, 20 Jul 2023 15:33:03 +0800 Subject: [PATCH 10/13] add histogram metrics --- .../eventbridge/BridgeMetricsManager.java | 64 ++++++++----------- 1 file changed, 28 insertions(+), 36 deletions(-) diff --git a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java index 2474f939..4e4acd31 100644 --- a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java +++ b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java @@ -21,7 +21,6 @@ import com.google.common.collect.Lists; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.AttributesBuilder; -import io.opentelemetry.api.metrics.LongCounter; import io.opentelemetry.api.metrics.LongHistogram; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.ObservableLongCounter; @@ -75,11 +74,10 @@ public class BridgeMetricsManager { // queue stats metrics public static ObservableLongGauge targetGauge = new NopObservableLongGauge(); - //invoke timeout + // invoke timeout public static LongHistogram invokeLatency = new NopLongHistogram(); // request metrics public static ObservableLongCounter messagesOutTotal = new NopLongCounter(); - public static LongHistogram messageSize = new NopLongHistogram(); public BridgeMetricsManager() { initMetricsProperties(); @@ -212,8 +210,6 @@ public void init() { .buildAndRegisterGlobal() .getMeter(OPEN_TELEMETRY_METER_NAME); - initRequestMetrics(); - initRuleMetrics(bridgeMeter); } private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) { @@ -230,6 +226,7 @@ private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) { .setType(InstrumentType.HISTOGRAM) .setName(HISTOGRAM_MESSAGE_SIZE) .build(); + View messageSizeView = View.builder() .setAggregation(Aggregation.explicitBucketHistogram(messageSizeBuckets)) .build(); @@ -240,6 +237,29 @@ private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) { } } + public static List> getMetricsView() { + List rpcCostTimeBuckets = Arrays.asList( + (double) Duration.ofMillis(1).toMillis(), + (double) Duration.ofMillis(3).toMillis(), + (double) Duration.ofMillis(5).toMillis(), + (double) Duration.ofMillis(7).toMillis(), + (double) Duration.ofMillis(10).toMillis(), + (double) Duration.ofMillis(100).toMillis(), + (double) Duration.ofSeconds(1).toMillis(), + (double) Duration.ofSeconds(2).toMillis(), + (double) Duration.ofSeconds(3).toMillis() + ); + InstrumentSelector selector = InstrumentSelector.builder() + .setType(InstrumentType.HISTOGRAM) + .setName(HISTOGRAM_RPC_LATENCY) + .build(); + View view = View.builder() + .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets)) + .build(); + return Lists.newArrayList(new Pair<>(selector, view)); + } + + public AttributesBuilder addGroup(Map labels ) { return newAttributesBuilder(labels); @@ -275,42 +295,14 @@ public void eventRuleLatencySeconds(String runnerName, String accountId ,String gaugeMetrics(EVENTRULE_LATENCY_SECONDS, latency, attributesBuilder); } - public void initRequestMetrics() { - messageSize = bridgeMeter.histogramBuilder(HISTOGRAM_MESSAGE_SIZE) - .setDescription("Incoming messages size") - .ofLongs() - .build(); - } - public void initRuleMetrics(Meter meter) { - invokeLatency = meter.histogramBuilder(HISTOGRAM_RPC_LATENCY) + public void countLatencyStat(long latency, String metricsName, String runnerName, String accountId, String status) { + invokeLatency = bridgeMeter.histogramBuilder(metricsName) .setDescription("invoke latency") .setUnit("milliseconds") .ofLongs() .build(); - - } - - public static List> getMetricsView() { - List rpcCostTimeBuckets = Arrays.asList( - (double) Duration.ofMillis(1).toMillis(), - (double) Duration.ofMillis(3).toMillis(), - (double) Duration.ofMillis(5).toMillis(), - (double) Duration.ofMillis(7).toMillis(), - (double) Duration.ofMillis(10).toMillis(), - (double) Duration.ofMillis(100).toMillis(), - (double) Duration.ofSeconds(1).toMillis(), - (double) Duration.ofSeconds(2).toMillis(), - (double) Duration.ofSeconds(3).toMillis() - ); - InstrumentSelector selector = InstrumentSelector.builder() - .setType(InstrumentType.HISTOGRAM) - .setName(HISTOGRAM_RPC_LATENCY) - .build(); - View view = View.builder() - .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets)) - .build(); - return Lists.newArrayList(new Pair<>(selector, view)); + invokeLatency.record(latency, newAttributesBuilder(buildLabelMap(runnerName, accountId, status)).build()); } From e0d59a2985fa0a8f85267d2ac93c6d82daab397a Mon Sep 17 00:00:00 2001 From: chengxy Date: Thu, 20 Jul 2023 16:42:24 +0800 Subject: [PATCH 11/13] refactor metrics --- .../runtime/boot/EventBusListener.java | 15 +++++------ .../runtime/boot/EventRuleTransfer.java | 18 +++++++++---- .../runtime/boot/EventTargetTrigger.java | 12 ++++++++- .../adapter/runtime/utils/RunnerUtil.java | 25 +++++++++++++++++++ .../eventbridge/BridgeMetricsConstant.java | 24 ++++++++++-------- .../eventbridge/BridgeMetricsManager.java | 9 ++++++- 6 files changed, 76 insertions(+), 27 deletions(-) create mode 100644 adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/RunnerUtil.java diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java index d7891dad..2c46cb31 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java @@ -25,6 +25,7 @@ import java.util.Optional; import org.apache.commons.collections.CollectionUtils; +import org.apache.rocketmq.eventbridge.BridgeMetricsConstant; import org.apache.rocketmq.eventbridge.BridgeMetricsManager; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorContext; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.listener.EventSubscriber; @@ -32,6 +33,7 @@ import org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.TargetRunnerConfig; import org.apache.rocketmq.eventbridge.adapter.runtime.config.RuntimeConfigDefine; import org.apache.rocketmq.eventbridge.adapter.runtime.error.ErrorHandler; +import org.apache.rocketmq.eventbridge.adapter.runtime.utils.RunnerUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,8 +68,8 @@ public void run() { for (ConnectRecord connectRecord : pullRecordList) { String runnerName = connectRecord.getExtension(RuntimeConfigDefine.RUNNER_NAME); - String accountId = getAccountId(connectRecord); - metricsManager.eventbusInEventsTotal(runnerName, accountId, "success", 1); + String accountId = RunnerUtil.getAccountId(circulatorContext,runnerName); + metricsManager.eventbusInEventsTotal(runnerName, accountId, BridgeMetricsConstant.Status.SUCCESS.name(), 1); } if (CollectionUtils.isEmpty(pullRecordList)) { this.waitForRunning(1000); @@ -78,19 +80,14 @@ public void run() { logger.error(getServiceName() + " - event bus pull record exception, stackTrace - ", exception); pullRecordList.forEach(pullRecord -> { String runnerName = pullRecord.getExtension(RuntimeConfigDefine.RUNNER_NAME); - String accountId = getAccountId(pullRecord); - metricsManager.eventbusInEventsTotal(runnerName, accountId, "failed", 1); + String accountId = RunnerUtil.getAccountId(circulatorContext, pullRecord); + metricsManager.eventbusInEventsTotal(runnerName, accountId, BridgeMetricsConstant.Status.FAILED.name(), 1); errorHandler.handle(pullRecord, exception); }); } } } - public String getAccountId(ConnectRecord connectRecord) { - String runnerName = connectRecord.getExtension(RuntimeConfigDefine.RUNNER_NAME); - TargetRunnerConfig runnerConfig = circulatorContext.getRunnerConfig(runnerName); - return runnerConfig.getAccountId(); - } @Override public String getServiceName() { diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java index dda1f954..7c951ec2 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java @@ -28,6 +28,7 @@ import javax.annotation.PostConstruct; import org.apache.commons.collections.MapUtils; +import org.apache.rocketmq.eventbridge.BridgeMetricsConstant; import org.apache.rocketmq.eventbridge.BridgeMetricsManager; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorContext; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.OffsetManager; @@ -36,6 +37,7 @@ import org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.TargetRunnerConfig; import org.apache.rocketmq.eventbridge.adapter.runtime.error.ErrorHandler; import org.apache.rocketmq.eventbridge.adapter.runtime.utils.ExceptionUtil; +import org.apache.rocketmq.eventbridge.adapter.runtime.utils.RunnerUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,17 +103,22 @@ public void run() { .exceptionally((exception) -> { logger.error("transfer do transform event record failed,stackTrace-", exception); //failed - metricsManager.eventRuleLatencySeconds(runnerName, runnerConfig.getAccountId(), "failed", System.currentTimeMillis() - startTime); + metricsManager.eventRuleLatencySeconds(runnerName, runnerConfig.getAccountId(), BridgeMetricsConstant.Status.FAILED.name(), System.currentTimeMillis() - startTime); + metricsManager.eventruleFilterEventsTotal(runnerName, runnerConfig.getAccountId(),BridgeMetricsConstant.Status.FAILED.name(), 1); errorHandler.handle(pullRecord, exception); return null; }) .thenAccept(pushRecord -> { if (Objects.nonNull(pushRecord)) { afterTransformConnect.add(pushRecord); - // success - metricsManager.eventRuleLatencySeconds(runnerName, runnerConfig.getAccountId(), "success",System.currentTimeMillis() - startTime); + metricsManager.eventRuleLatencySeconds(RunnerUtil.getRunnerName(pushRecord), RunnerUtil.getAccountId(circulatorContext, pushRecord), + BridgeMetricsConstant.Status.FAILED.name(), System.currentTimeMillis() - startTime); + } else { offsetManager.commit(pullRecord); + // success + metricsManager.eventruleFilterEventsTotal(runnerName, runnerConfig.getAccountId(),BridgeMetricsConstant.Status.SUCCESS.name(), 1); + metricsManager.eventRuleLatencySeconds(runnerName, runnerConfig.getAccountId(), BridgeMetricsConstant.Status.SUCCESS.name(),System.currentTimeMillis() - startTime); } }); completableFutures.add(transformFuture); @@ -119,12 +126,13 @@ public void run() { } CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[eventRecordMap.values().size()])).get(); circulatorContext.offerTargetTaskQueue(afterTransformConnect); - //success logger.info("offer target task queues succeed, transforms - {}", JSON.toJSONString(afterTransformConnect)); } catch (Exception exception) { //failed logger.error("transfer event record failed, stackTrace-", exception); - afterTransformConnect.forEach(transferRecord -> errorHandler.handle(transferRecord, exception)); + afterTransformConnect.forEach(transferRecord -> { + errorHandler.handle(transferRecord, exception); + }); } } diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java index 838e8948..1f1602ac 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java @@ -26,12 +26,14 @@ import java.util.concurrent.ExecutorService; import org.apache.commons.collections.MapUtils; +import org.apache.rocketmq.eventbridge.BridgeMetricsConstant; import org.apache.rocketmq.eventbridge.BridgeMetricsManager; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.OffsetManager; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorContext; import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread; import org.apache.rocketmq.eventbridge.adapter.runtime.error.ErrorHandler; import org.apache.rocketmq.eventbridge.adapter.runtime.utils.ExceptionUtil; +import org.apache.rocketmq.eventbridge.adapter.runtime.utils.RunnerUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,6 +74,8 @@ public void run() { } for(String runnerName: targetRecordMap.keySet()){ + long startTime = System.currentTimeMillis(); + String accountId = RunnerUtil.getAccountId(circulatorContext,runnerName); ExecutorService executorService = circulatorContext.getExecutorService(runnerName); executorService.execute(() -> { SinkTask sinkTask = circulatorContext.getPusherTaskMap().get(runnerName); @@ -79,15 +83,21 @@ public void run() { try { sinkTask.put(triggerRecords); offsetManager.commit(triggerRecords); + metricsManager.countLatencyStat(System.currentTimeMillis() - startTime, BridgeMetricsConstant.EVENTRULE_TRIGGER_LATENCY, runnerName, accountId, BridgeMetricsConstant.Status.SUCCESS.name() ); } catch (Exception exception) { logger.error(getServiceName() + " push target exception, stackTrace-", exception); - triggerRecords.forEach(triggerRecord -> errorHandler.handle(triggerRecord, exception)); + triggerRecords.forEach(triggerRecord -> { + errorHandler.handle(triggerRecord, exception); + metricsManager.countLatencyStat(System.currentTimeMillis() - startTime, BridgeMetricsConstant.EVENTRULE_TRIGGER_LATENCY, runnerName, accountId, BridgeMetricsConstant.Status.FAILED.name() ); + + }); } }); } } } + @Override public String getServiceName() { return EventTargetTrigger.class.getSimpleName(); diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/RunnerUtil.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/RunnerUtil.java new file mode 100644 index 00000000..6d32bb5d --- /dev/null +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/RunnerUtil.java @@ -0,0 +1,25 @@ +package org.apache.rocketmq.eventbridge.adapter.runtime.utils; + +import io.openmessaging.connector.api.data.ConnectRecord; +import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorContext; +import org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.TargetRunnerConfig; +import org.apache.rocketmq.eventbridge.adapter.runtime.config.RuntimeConfigDefine; + +public class RunnerUtil { + + public static String getAccountId(CirculatorContext circulatorContext, ConnectRecord connectRecord) { + String runnerName = connectRecord.getExtension(RuntimeConfigDefine.RUNNER_NAME); + TargetRunnerConfig runnerConfig = circulatorContext.getRunnerConfig(runnerName); + return runnerConfig.getAccountId(); + } + + + public static String getAccountId(CirculatorContext circulatorContext, String runnerName) { + TargetRunnerConfig runnerConfig = circulatorContext.getRunnerConfig(runnerName); + return runnerConfig.getAccountId(); + } + + public static String getRunnerName(ConnectRecord connectRecord) { + return connectRecord.getExtension(RuntimeConfigDefine.RUNNER_NAME); + } +} diff --git a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsConstant.java b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsConstant.java index 86536968..c77f44e4 100644 --- a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsConstant.java +++ b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsConstant.java @@ -24,21 +24,23 @@ public class BridgeMetricsConstant { public static final String OPEN_TELEMETRY_METER_NAME = "bridge-meter"; public static final String HISTOGRAM_MESSAGE_SIZE = "eventbridge_message_size"; public static final String EVENTBUS_IN_EVENTS_TOTAL = "eventbridge_eventbus_in_events_total"; + public static final String EVENTRULE_FILTER_EVENTS_TOTAL = "eventbridge_eventrule_filter_events_total"; public static final String EVENTRULE_LATENCY_SECONDS = "eventbridge_eventrule_latency_seconds"; + public static final String EVENTRULE_TRIGGER_LATENCY = "eventbridge_eventrule_trigger_latency"; - /** eventbridge process message latency**/ - public static final String HISTOGRAM_RPC_LATENCY = "process_latency"; - public static final Map ACCOUNT_LABELS = new HashMap() { - { put("account_id", "account id"); } - { put("runnerName", "runnerName"); } - { put("status", "status"); } - }; + public enum Status { + SUCCESS("success"), + FAILED("failed"); - public static final Map RUNNER_NAME_LABELS = new HashMap() { - { put("account_id", "account id"); } - { put("runnerName", "runner name"); } - }; + private String status; + Status(String status){ + this.status = status; + } + public String getStatus() { + return status; + } + } } diff --git a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java index 4e4acd31..365a92f0 100644 --- a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java +++ b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java @@ -251,7 +251,7 @@ public static List> getMetricsView() { ); InstrumentSelector selector = InstrumentSelector.builder() .setType(InstrumentType.HISTOGRAM) - .setName(HISTOGRAM_RPC_LATENCY) + .setName(EVENTRULE_TRIGGER_LATENCY) .build(); View view = View.builder() .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets)) @@ -289,6 +289,13 @@ public void eventbusInEventsTotal(String runnerName, String accountId, String st countMetrics(EVENTBUS_IN_EVENTS_TOTAL, count, attributesBuilder); } + + public void eventruleFilterEventsTotal(String runnerName, String accountId, String status, long count) { + Map labelMaps = buildLabelMap(runnerName, accountId, status); + AttributesBuilder attributesBuilder = addGroup(labelMaps); + countMetrics(EVENTRULE_FILTER_EVENTS_TOTAL, count, attributesBuilder); + } + public void eventRuleLatencySeconds(String runnerName, String accountId ,String status, long latency) { Map labelMaps = buildLabelMap(runnerName, accountId, status); AttributesBuilder attributesBuilder = addGroup(labelMaps); From 49878891c7318755936e62fb58650532ec452618 Mon Sep 17 00:00:00 2001 From: chengxy Date: Wed, 2 Aug 2023 10:38:57 +0800 Subject: [PATCH 12/13] add license --- .../adapter/runtime/utils/RunnerUtil.java | 17 +++++++++++++++++ metrics/src/main/resources/metrics.properties | 15 +++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/RunnerUtil.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/RunnerUtil.java index 6d32bb5d..4a9f8295 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/RunnerUtil.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/RunnerUtil.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.eventbridge.adapter.runtime.utils; import io.openmessaging.connector.api.data.ConnectRecord; diff --git a/metrics/src/main/resources/metrics.properties b/metrics/src/main/resources/metrics.properties index cea5c999..1b9bd52a 100644 --- a/metrics/src/main/resources/metrics.properties +++ b/metrics/src/main/resources/metrics.properties @@ -1,3 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + ## metrics metrics.endpoint.host=127.0.0.1 metrics.endpoint.port=19090 From 44560c6068cba6443b4a0b73c7333cd7d415d697 Mon Sep 17 00:00:00 2001 From: chengxy Date: Mon, 14 Aug 2023 10:20:58 +0800 Subject: [PATCH 13/13] recover properties --- start/src/main/resources/application.properties | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/start/src/main/resources/application.properties b/start/src/main/resources/application.properties index 7170b199..37a0a5e2 100644 --- a/start/src/main/resources/application.properties +++ b/start/src/main/resources/application.properties @@ -29,11 +29,10 @@ rocketmq.namesrvAddr=localhost:9876 rocketmq.cluster.name=DefaultCluster ## runtime -runtime.config.mode=FILE +runtime.config.mode=DB runtime.storage.mode=ROCKETMQ rumtime.name=eventbridge-runtimer -runtime.pluginpath=E:\\rocketmq_files\\plugins -runtime.storePathRootDir=E:\\rocketmq_files\\storeRoot +runtime.pluginpath=~/eventbridge/plugin ## log app.name=rocketmqeventbridge