diff --git a/adapter/runtime/pom.xml b/adapter/runtime/pom.xml
index a3f501fd..ff07354c 100644
--- a/adapter/runtime/pom.xml
+++ b/adapter/runtime/pom.xml
@@ -101,6 +101,12 @@
4.0.3
test
+
+ org.apache.rocketmq.eventbridge
+ rocketmq-eventbridge-metrics
+ 1.0.0
+ compile
+
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java
index af8bbbd3..d831cfc4 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/Runtime.java
@@ -17,7 +17,9 @@
package org.apache.rocketmq.eventbridge.adapter.runtime;
+import org.apache.rocketmq.eventbridge.BridgeMetricsManager;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.EventBusListener;
+import org.apache.rocketmq.eventbridge.adapter.runtime.boot.EventMonitor;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.EventRuleTransfer;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.EventTargetTrigger;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorContext;
@@ -69,9 +71,13 @@ public void initAndStart() throws Exception {
circulatorContext.initCirculatorContext(runnerConfigObserver.getTargetRunnerConfig());
runnerConfigObserver.registerListener(circulatorContext);
runnerConfigObserver.registerListener(eventSubscriber);
- EventBusListener eventBusListener = new EventBusListener(circulatorContext, eventSubscriber, errorHandler);
- EventRuleTransfer eventRuleTransfer = new EventRuleTransfer(circulatorContext, offsetManager, errorHandler);
- EventTargetTrigger eventTargetPusher = new EventTargetTrigger(circulatorContext, offsetManager, errorHandler);
+
+ BridgeMetricsManager metricsManager = new BridgeMetricsManager();
+ EventMonitor eventMonitor = new EventMonitor(metricsManager);
+ EventBusListener eventBusListener = new EventBusListener(circulatorContext, eventSubscriber, errorHandler, metricsManager);
+ EventRuleTransfer eventRuleTransfer = new EventRuleTransfer(circulatorContext, offsetManager, errorHandler, metricsManager);
+ EventTargetTrigger eventTargetPusher = new EventTargetTrigger(circulatorContext, offsetManager, errorHandler, metricsManager);
+ RUNTIME_START_AND_SHUTDOWN.appendStartAndShutdown(eventMonitor);
RUNTIME_START_AND_SHUTDOWN.appendStartAndShutdown(eventBusListener);
RUNTIME_START_AND_SHUTDOWN.appendStartAndShutdown(eventRuleTransfer);
RUNTIME_START_AND_SHUTDOWN.appendStartAndShutdown(eventTargetPusher);
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java
index 48af27f3..2c46cb31 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventBusListener.java
@@ -19,12 +19,21 @@
import com.google.common.collect.Lists;
import io.openmessaging.connector.api.data.ConnectRecord;
+
+import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
+
import org.apache.commons.collections.CollectionUtils;
+import org.apache.rocketmq.eventbridge.BridgeMetricsConstant;
+import org.apache.rocketmq.eventbridge.BridgeMetricsManager;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorContext;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.listener.EventSubscriber;
import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread;
+import org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.TargetRunnerConfig;
+import org.apache.rocketmq.eventbridge.adapter.runtime.config.RuntimeConfigDefine;
import org.apache.rocketmq.eventbridge.adapter.runtime.error.ErrorHandler;
+import org.apache.rocketmq.eventbridge.adapter.runtime.utils.RunnerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,12 +49,14 @@ public class EventBusListener extends ServiceThread {
private final CirculatorContext circulatorContext;
private final EventSubscriber eventSubscriber;
private final ErrorHandler errorHandler;
+ private BridgeMetricsManager metricsManager;
public EventBusListener(CirculatorContext circulatorContext, EventSubscriber eventSubscriber,
- ErrorHandler errorHandler) {
+ ErrorHandler errorHandler, BridgeMetricsManager metricsManager) {
this.circulatorContext = circulatorContext;
this.eventSubscriber = eventSubscriber;
this.errorHandler = errorHandler;
+ this.metricsManager = metricsManager;
}
@Override
@@ -53,7 +64,13 @@ public void run() {
while (!stopped) {
List pullRecordList = Lists.newArrayList();
try {
- pullRecordList = eventSubscriber.pull();
+ pullRecordList = Optional.ofNullable(eventSubscriber.pull()).orElse(new ArrayList<>());
+
+ for (ConnectRecord connectRecord : pullRecordList) {
+ String runnerName = connectRecord.getExtension(RuntimeConfigDefine.RUNNER_NAME);
+ String accountId = RunnerUtil.getAccountId(circulatorContext,runnerName);
+ metricsManager.eventbusInEventsTotal(runnerName, accountId, BridgeMetricsConstant.Status.SUCCESS.name(), 1);
+ }
if (CollectionUtils.isEmpty(pullRecordList)) {
this.waitForRunning(1000);
continue;
@@ -61,11 +78,17 @@ public void run() {
circulatorContext.offerEventRecords(pullRecordList);
} catch (Exception exception) {
logger.error(getServiceName() + " - event bus pull record exception, stackTrace - ", exception);
- pullRecordList.forEach(pullRecord -> errorHandler.handle(pullRecord, exception));
+ pullRecordList.forEach(pullRecord -> {
+ String runnerName = pullRecord.getExtension(RuntimeConfigDefine.RUNNER_NAME);
+ String accountId = RunnerUtil.getAccountId(circulatorContext, pullRecord);
+ metricsManager.eventbusInEventsTotal(runnerName, accountId, BridgeMetricsConstant.Status.FAILED.name(), 1);
+ errorHandler.handle(pullRecord, exception);
+ });
}
}
}
+
@Override
public String getServiceName() {
return EventBusListener.class.getSimpleName();
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventMonitor.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventMonitor.java
new file mode 100644
index 00000000..b60d12c1
--- /dev/null
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventMonitor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtime.boot;
+
+import org.apache.rocketmq.eventbridge.BridgeMetricsManager;
+import org.apache.rocketmq.eventbridge.adapter.runtime.boot.listener.EventSubscriber;
+import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread;
+
+public class EventMonitor extends ServiceThread {
+
+ private BridgeMetricsManager bridgeMetricsManager;
+
+ public EventMonitor(BridgeMetricsManager metricsManager) {
+ this.bridgeMetricsManager = metricsManager;
+ }
+ @Override
+ public String getServiceName() {
+ return EventMonitor.class.getSimpleName();
+ }
+
+ @Override
+ public void run() {
+ bridgeMetricsManager.init();
+ }
+
+ @Override
+ public void shutdown() {
+ bridgeMetricsManager.shutdown();
+ }
+}
\ No newline at end of file
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
index f24ed1b7..43d5b175 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java
@@ -25,14 +25,19 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
+
import javax.annotation.PostConstruct;
import org.apache.commons.collections.MapUtils;
+import org.apache.rocketmq.eventbridge.BridgeMetricsConstant;
+import org.apache.rocketmq.eventbridge.BridgeMetricsManager;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorContext;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.OffsetManager;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.transfer.TransformEngine;
import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread;
+import org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.TargetRunnerConfig;
import org.apache.rocketmq.eventbridge.adapter.runtime.error.ErrorHandler;
import org.apache.rocketmq.eventbridge.adapter.runtime.utils.ExceptionUtil;
+import org.apache.rocketmq.eventbridge.adapter.runtime.utils.RunnerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,12 +53,14 @@ public class EventRuleTransfer extends ServiceThread {
private final CirculatorContext circulatorContext;
private final OffsetManager offsetManager;
private final ErrorHandler errorHandler;
+ private BridgeMetricsManager metricsManager;
public EventRuleTransfer(CirculatorContext circulatorContext, OffsetManager offsetManager,
- ErrorHandler errorHandler) {
+ ErrorHandler errorHandler, BridgeMetricsManager metricsManager) {
this.circulatorContext = circulatorContext;
this.offsetManager = offsetManager;
this.errorHandler = errorHandler;
+ this.metricsManager = metricsManager;
}
@Override
@@ -87,20 +94,31 @@ public void run() {
afterTransformConnect.clear();
List> completableFutures = Lists.newArrayList();
for (String runnerName : eventRecordMap.keySet()) {
+ TargetRunnerConfig runnerConfig = circulatorContext.getRunnerConfig(runnerName);
TransformEngine curTransformEngine = latestTransformMap.get(runnerName);
List curEventRecords = eventRecordMap.get(runnerName);
curEventRecords.forEach(pullRecord -> {
+ long startTime = System.currentTimeMillis();
CompletableFuture transformFuture = CompletableFuture.supplyAsync(() -> curTransformEngine.doTransforms(pullRecord))
.exceptionally((exception) -> {
logger.error("transfer do transform event record failed,stackTrace-", exception);
+ //failed
+ metricsManager.eventRuleLatencySeconds(runnerName, runnerConfig.getAccountId(), BridgeMetricsConstant.Status.FAILED.name(), System.currentTimeMillis() - startTime);
+ metricsManager.eventruleFilterEventsTotal(runnerName, runnerConfig.getAccountId(),BridgeMetricsConstant.Status.FAILED.name(), 1);
errorHandler.handle(pullRecord, exception);
return null;
})
.thenAccept(pushRecord -> {
if (Objects.nonNull(pushRecord)) {
afterTransformConnect.add(pushRecord);
+ metricsManager.eventRuleLatencySeconds(RunnerUtil.getRunnerName(pushRecord), RunnerUtil.getAccountId(circulatorContext, pushRecord),
+ BridgeMetricsConstant.Status.FAILED.name(), System.currentTimeMillis() - startTime);
+
} else {
offsetManager.commit(pullRecord);
+ // success
+ metricsManager.eventruleFilterEventsTotal(runnerName, runnerConfig.getAccountId(),BridgeMetricsConstant.Status.SUCCESS.name(), 1);
+ metricsManager.eventRuleLatencySeconds(runnerName, runnerConfig.getAccountId(), BridgeMetricsConstant.Status.SUCCESS.name(),System.currentTimeMillis() - startTime);
}
});
completableFutures.add(transformFuture);
@@ -110,8 +128,11 @@ public void run() {
circulatorContext.offerTargetTaskQueue(afterTransformConnect);
logger.info("offer target task queues succeed, transforms - {}", JSON.toJSONString(afterTransformConnect));
} catch (Exception exception) {
+ //failed
logger.error("transfer event record failed, stackTrace-", exception);
- afterTransformConnect.forEach(transferRecord -> errorHandler.handle(transferRecord, exception));
+ afterTransformConnect.forEach(transferRecord -> {
+ errorHandler.handle(transferRecord, exception);
+ });
}
}
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
index 85dae3b8..1f1602ac 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java
@@ -26,11 +26,14 @@
import java.util.concurrent.ExecutorService;
import org.apache.commons.collections.MapUtils;
+import org.apache.rocketmq.eventbridge.BridgeMetricsConstant;
+import org.apache.rocketmq.eventbridge.BridgeMetricsManager;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.OffsetManager;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorContext;
import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread;
import org.apache.rocketmq.eventbridge.adapter.runtime.error.ErrorHandler;
import org.apache.rocketmq.eventbridge.adapter.runtime.utils.ExceptionUtil;
+import org.apache.rocketmq.eventbridge.adapter.runtime.utils.RunnerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,12 +50,14 @@ public class EventTargetTrigger extends ServiceThread {
private final OffsetManager offsetManager;
private final ErrorHandler errorHandler;
private volatile Integer batchSize = 100;
+ private BridgeMetricsManager metricsManager;
public EventTargetTrigger(CirculatorContext circulatorContext, OffsetManager offsetManager,
- ErrorHandler errorHandler) {
+ ErrorHandler errorHandler, BridgeMetricsManager metricsManager) {
this.circulatorContext = circulatorContext;
this.offsetManager = offsetManager;
this.errorHandler = errorHandler;
+ this.metricsManager = metricsManager;
}
@Override
@@ -69,6 +74,8 @@ public void run() {
}
for(String runnerName: targetRecordMap.keySet()){
+ long startTime = System.currentTimeMillis();
+ String accountId = RunnerUtil.getAccountId(circulatorContext,runnerName);
ExecutorService executorService = circulatorContext.getExecutorService(runnerName);
executorService.execute(() -> {
SinkTask sinkTask = circulatorContext.getPusherTaskMap().get(runnerName);
@@ -76,15 +83,21 @@ public void run() {
try {
sinkTask.put(triggerRecords);
offsetManager.commit(triggerRecords);
+ metricsManager.countLatencyStat(System.currentTimeMillis() - startTime, BridgeMetricsConstant.EVENTRULE_TRIGGER_LATENCY, runnerName, accountId, BridgeMetricsConstant.Status.SUCCESS.name() );
} catch (Exception exception) {
logger.error(getServiceName() + " push target exception, stackTrace-", exception);
- triggerRecords.forEach(triggerRecord -> errorHandler.handle(triggerRecord, exception));
+ triggerRecords.forEach(triggerRecord -> {
+ errorHandler.handle(triggerRecord, exception);
+ metricsManager.countLatencyStat(System.currentTimeMillis() - startTime, BridgeMetricsConstant.EVENTRULE_TRIGGER_LATENCY, runnerName, accountId, BridgeMetricsConstant.Status.FAILED.name() );
+
+ });
}
});
}
}
}
+
@Override
public String getServiceName() {
return EventTargetTrigger.class.getSimpleName();
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java
index be4db9bb..8fa87fc9 100644
--- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/listener/EventSubscriber.java
@@ -18,6 +18,7 @@
package org.apache.rocketmq.eventbridge.adapter.runtime.boot.listener;
import io.openmessaging.connector.api.data.ConnectRecord;
+import org.apache.rocketmq.eventbridge.BridgeMetricsManager;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.TargetRunnerListener;
import org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.SubscribeRunnerKeys;
import org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.TargetRunnerConfig;
diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/RunnerUtil.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/RunnerUtil.java
new file mode 100644
index 00000000..4a9f8295
--- /dev/null
+++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/utils/RunnerUtil.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtime.utils;
+
+import io.openmessaging.connector.api.data.ConnectRecord;
+import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorContext;
+import org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.TargetRunnerConfig;
+import org.apache.rocketmq.eventbridge.adapter.runtime.config.RuntimeConfigDefine;
+
+public class RunnerUtil {
+
+ public static String getAccountId(CirculatorContext circulatorContext, ConnectRecord connectRecord) {
+ String runnerName = connectRecord.getExtension(RuntimeConfigDefine.RUNNER_NAME);
+ TargetRunnerConfig runnerConfig = circulatorContext.getRunnerConfig(runnerName);
+ return runnerConfig.getAccountId();
+ }
+
+
+ public static String getAccountId(CirculatorContext circulatorContext, String runnerName) {
+ TargetRunnerConfig runnerConfig = circulatorContext.getRunnerConfig(runnerName);
+ return runnerConfig.getAccountId();
+ }
+
+ public static String getRunnerName(ConnectRecord connectRecord) {
+ return connectRecord.getExtension(RuntimeConfigDefine.RUNNER_NAME);
+ }
+}
diff --git a/adapter/runtime/src/main/resources/runtime.properties b/adapter/runtime/src/main/resources/runtime.properties
index b99c83d9..39d00e45 100644
--- a/adapter/runtime/src/main/resources/runtime.properties
+++ b/adapter/runtime/src/main/resources/runtime.properties
@@ -18,10 +18,9 @@ rocketmq.namesrvAddr=localhost:9876
rocketmq.consumer.pullTimeOut = 3000
rocketmq.consumer.pullBatchSize=20
rocketmq.cluster.name=DefaultCluster
+rocketmq.consumerGroup=default
## runtime
-rumtimer.name=eventbridge-runtimer
-runtimer.pluginpath=/Users/Local/eventbridge/plugin
-runtimer.storePathRootDir=/Users/Local/eventbridge/store
+rumtime.name=eventbridge-runtimer
## listener
listener.eventQueue.threshold=50000
listener.targetQueue.threshold=50000
\ No newline at end of file
diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
index db4633ec..b0dfa86c 100644
--- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
+++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java
@@ -34,6 +34,9 @@
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.utils.NetworkUtil;
+import org.apache.rocketmq.eventbridge.BridgeConfig;
+import org.apache.rocketmq.eventbridge.BridgeMetricsManager;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.listener.EventSubscriber;
import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread;
import org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.SubscribeRunnerKeys;
@@ -85,6 +88,7 @@ public class RocketMQEventSubscriber extends EventSubscriber {
private Integer pullTimeOut;
private Integer pullBatchSize;
+ private BridgeConfig bridgeConfig;
private ClientConfig clientConfig;
private SessionCredentials sessionCredentials;
private String socksProxy;
diff --git a/common/pom.xml b/common/pom.xml
index a2e78fc6..89dff4f3 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -23,6 +23,8 @@
8
8
+ 1.19.0
+ 1.19.0-alpha
@@ -68,5 +70,25 @@
assertj-core
test
+
+ io.opentelemetry
+ opentelemetry-exporter-otlp
+ ${opentelemetry.version}
+
+
+ io.opentelemetry
+ opentelemetry-exporter-prometheus
+ ${opentelemetry-exporter-prometheus.version}
+
+
+ io.opentelemetry
+ opentelemetry-exporter-logging
+ ${opentelemetry.version}
+
+
+ io.opentelemetry
+ opentelemetry-sdk
+ ${opentelemetry.version}
+
\ No newline at end of file
diff --git a/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongCounter.java b/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongCounter.java
new file mode 100644
index 00000000..536610bc
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongCounter.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.metrics;
+
+import io.opentelemetry.api.metrics.ObservableLongCounter;
+
+public class NopLongCounter implements ObservableLongCounter {
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongHistogram.java b/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongHistogram.java
new file mode 100644
index 00000000..a6d4029b
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongHistogram.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.metrics;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongHistogram;
+import io.opentelemetry.context.Context;
+
+public class NopLongHistogram implements LongHistogram {
+ @Override public void record(long l) {
+
+ }
+
+ @Override public void record(long l, Attributes attributes) {
+
+ }
+
+ @Override public void record(long l, Attributes attributes, Context context) {
+
+ }
+}
\ No newline at end of file
diff --git a/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongUpDownCounter.java b/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongUpDownCounter.java
new file mode 100644
index 00000000..84427c6c
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopLongUpDownCounter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.metrics;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongUpDownCounter;
+import io.opentelemetry.context.Context;
+
+
+public class NopLongUpDownCounter implements LongUpDownCounter {
+ @Override public void add(long l) {
+
+ }
+
+ @Override public void add(long l, Attributes attributes) {
+
+ }
+
+ @Override public void add(long l, Attributes attributes, Context context) {
+
+ }
+}
\ No newline at end of file
diff --git a/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopObservableLongGauge.java b/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopObservableLongGauge.java
new file mode 100644
index 00000000..37a801a5
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/eventbridge/metrics/NopObservableLongGauge.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.metrics;
+
+import io.opentelemetry.api.metrics.ObservableLongGauge;
+
+public class NopObservableLongGauge implements ObservableLongGauge {
+}
diff --git a/metrics/pom.xml b/metrics/pom.xml
new file mode 100644
index 00000000..b6f696ab
--- /dev/null
+++ b/metrics/pom.xml
@@ -0,0 +1,134 @@
+
+
+
+
+ rocketmq-eventbridge
+ org.apache.rocketmq
+ 1.0.0
+
+ 4.0.0
+
+ org.apache.rocketmq.eventbridge
+ rocketmq-eventbridge-metrics
+
+ rocketmq-eventbridge-metrics
+
+ http://www.example.com
+
+
+ UTF-8
+ 1.19.0
+ 1.19.0-alpha
+
+
+
+
+ org.apache.rocketmq
+ rocketmq-eventbridge-common
+ 1.0.0
+ provided
+
+
+ org.springframework
+ spring-context
+
+
+ io.opentelemetry
+ opentelemetry-exporter-otlp
+ ${opentelemetry.version}
+
+
+ com.squareup.okio
+ okio-jvm
+
+
+
+
+ io.opentelemetry
+ opentelemetry-exporter-prometheus
+ ${opentelemetry-exporter-prometheus.version}
+
+
+ io.opentelemetry
+ opentelemetry-exporter-logging
+ ${opentelemetry.version}
+
+
+ io.opentelemetry
+ opentelemetry-sdk
+ ${opentelemetry.version}
+
+
+ junit
+ junit
+ 4.11
+ test
+
+
+ io.github.aliyunmq
+ rocketmq-shaded-slf4j-api-bridge
+ 1.0.0
+
+
+ org.slf4j
+ jul-to-slf4j
+ 2.0.6
+
+
+
+
+
+
+
+
+ maven-clean-plugin
+ 3.1.0
+
+
+
+ maven-resources-plugin
+ 3.0.2
+
+
+ maven-compiler-plugin
+ 3.8.0
+
+
+ maven-surefire-plugin
+ 2.22.1
+
+
+ maven-jar-plugin
+ 3.0.2
+
+
+ maven-install-plugin
+ 2.5.2
+
+
+ maven-deploy-plugin
+ 2.8.2
+
+
+
+ maven-site-plugin
+ 3.7.1
+
+
+ maven-project-info-reports-plugin
+ 3.0.0
+
+
+
+
+
diff --git a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeConfig.java b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeConfig.java
new file mode 100644
index 00000000..4bc1d4d7
--- /dev/null
+++ b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeConfig.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge;
+
+public class BridgeConfig {
+ public enum MetricsExporterType {
+ DISABLE(0),
+ OTLP_GRPC(1),
+ PROM(2),
+ LOG(3);
+
+ private final int value;
+
+ MetricsExporterType(int value) {
+ this.value = value;
+ }
+
+ public int getValue() {
+ return value;
+ }
+
+ public static MetricsExporterType valueOf(int value) {
+ switch (value) {
+ case 1:
+ return OTLP_GRPC;
+ case 2:
+ return PROM;
+ case 3:
+ return LOG;
+ default:
+ return DISABLE;
+ }
+ }
+
+ public boolean isEnable() {
+ return this.value > 0;
+ }
+ }
+
+ private MetricsExporterType metricsExporterType = MetricsExporterType.DISABLE;
+
+ private String metricsGrpcExporterTarget = "";
+ private String metricsGrpcExporterHeader = "";
+ private long metricGrpcExporterTimeOutInMills = 3 * 1000;
+ private long metricGrpcExporterIntervalInMills = 60 * 1000;
+ private long metricLoggingExporterIntervalInMills = 10 * 1000;
+
+ private int metricsPromExporterPort = 5557;
+ private String metricsPromExporterHost = "";
+
+ // Label pairs in CSV. Each label follows pattern of Key:Value. eg: instance_id:xxx,uid:xxx
+ private String metricsLabel = "";
+
+ private boolean metricsInDelta = false;
+
+ private String eventBridgeAddress;
+
+
+ public MetricsExporterType getMetricsExporterType() {
+ return metricsExporterType;
+ }
+
+ public void setMetricsExporterType(MetricsExporterType metricsExporterType) {
+ this.metricsExporterType = metricsExporterType;
+ }
+
+ public void setMetricsExporterType(int metricsExporterType) {
+ this.metricsExporterType = MetricsExporterType.valueOf(metricsExporterType);
+ }
+
+ public void setMetricsExporterType(String metricsExporterType) {
+ this.metricsExporterType = MetricsExporterType.valueOf(metricsExporterType);
+ }
+
+ public String getMetricsGrpcExporterTarget() {
+ return metricsGrpcExporterTarget;
+ }
+
+ public void setMetricsGrpcExporterTarget(String metricsGrpcExporterTarget) {
+ this.metricsGrpcExporterTarget = metricsGrpcExporterTarget;
+ }
+
+ public String getMetricsGrpcExporterHeader() {
+ return metricsGrpcExporterHeader;
+ }
+
+ public void setMetricsGrpcExporterHeader(String metricsGrpcExporterHeader) {
+ this.metricsGrpcExporterHeader = metricsGrpcExporterHeader;
+ }
+
+ public long getMetricGrpcExporterTimeOutInMills() {
+ return metricGrpcExporterTimeOutInMills;
+ }
+
+ public void setMetricGrpcExporterTimeOutInMills(long metricGrpcExporterTimeOutInMills) {
+ this.metricGrpcExporterTimeOutInMills = metricGrpcExporterTimeOutInMills;
+ }
+
+ public long getMetricGrpcExporterIntervalInMills() {
+ return metricGrpcExporterIntervalInMills;
+ }
+
+ public void setMetricGrpcExporterIntervalInMills(long metricGrpcExporterIntervalInMills) {
+ this.metricGrpcExporterIntervalInMills = metricGrpcExporterIntervalInMills;
+ }
+
+ public long getMetricLoggingExporterIntervalInMills() {
+ return metricLoggingExporterIntervalInMills;
+ }
+
+ public void setMetricLoggingExporterIntervalInMills(long metricLoggingExporterIntervalInMills) {
+ this.metricLoggingExporterIntervalInMills = metricLoggingExporterIntervalInMills;
+ }
+
+ public String getMetricsLabel() {
+ return metricsLabel;
+ }
+
+ public void setMetricsLabel(String metricsLabel) {
+ this.metricsLabel = metricsLabel;
+ }
+
+ public boolean isMetricsInDelta() {
+ return metricsInDelta;
+ }
+
+ public void setMetricsInDelta(boolean metricsInDelta) {
+ this.metricsInDelta = metricsInDelta;
+ }
+
+ public int getMetricsPromExporterPort() {
+ return metricsPromExporterPort;
+ }
+
+ public void setMetricsPromExporterPort(int metricsPromExporterPort) {
+ this.metricsPromExporterPort = metricsPromExporterPort;
+ }
+
+ public String getMetricsPromExporterHost() {
+ return metricsPromExporterHost;
+ }
+
+ public void setMetricsPromExporterHost(String metricsPromExporterHost) {
+ this.metricsPromExporterHost = metricsPromExporterHost;
+ }
+
+ public String getEventBridgeAddress() {
+ return eventBridgeAddress;
+ }
+
+ public void setEventBridgeAddress(String eventBridgeAddress) {
+ this.eventBridgeAddress = eventBridgeAddress;
+ }
+}
diff --git a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsConstant.java b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsConstant.java
new file mode 100644
index 00000000..c77f44e4
--- /dev/null
+++ b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsConstant.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class BridgeMetricsConstant {
+ public static final String OPEN_TELEMETRY_METER_NAME = "bridge-meter";
+ public static final String HISTOGRAM_MESSAGE_SIZE = "eventbridge_message_size";
+ public static final String EVENTBUS_IN_EVENTS_TOTAL = "eventbridge_eventbus_in_events_total";
+ public static final String EVENTRULE_FILTER_EVENTS_TOTAL = "eventbridge_eventrule_filter_events_total";
+ public static final String EVENTRULE_LATENCY_SECONDS = "eventbridge_eventrule_latency_seconds";
+
+ public static final String EVENTRULE_TRIGGER_LATENCY = "eventbridge_eventrule_trigger_latency";
+
+
+ public enum Status {
+ SUCCESS("success"),
+ FAILED("failed");
+
+ private String status;
+ Status(String status){
+ this.status = status;
+ }
+ public String getStatus() {
+ return status;
+ }
+ }
+
+}
diff --git a/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java
new file mode 100644
index 00000000..365a92f0
--- /dev/null
+++ b/metrics/src/main/java/org/apache/rocketmq/eventbridge/BridgeMetricsManager.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.LongHistogram;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongCounter;
+import io.opentelemetry.api.metrics.ObservableLongGauge;
+import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
+import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder;
+import io.opentelemetry.exporter.prometheus.PrometheusHttpServer;
+import io.opentelemetry.exporter.logging.LoggingMetricExporter;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.Aggregation;
+import io.opentelemetry.sdk.metrics.InstrumentSelector;
+import io.opentelemetry.sdk.metrics.InstrumentType;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
+import io.opentelemetry.sdk.metrics.View;
+import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
+import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
+import io.opentelemetry.sdk.resources.Resource;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.rocketmq.eventbridge.metrics.NopLongCounter;
+import org.apache.rocketmq.eventbridge.metrics.NopLongHistogram;
+import org.apache.rocketmq.eventbridge.metrics.NopObservableLongGauge;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.bridge.SLF4JBridgeHandler;
+import org.springframework.core.io.support.PropertiesLoaderUtils;
+
+import static org.apache.rocketmq.eventbridge.BridgeMetricsConstant.*;
+
+
+public class BridgeMetricsManager {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(BridgeMetricsManager.class);
+
+ private BridgeConfig bridgeConfig;
+ private OtlpGrpcMetricExporter metricExporter;
+ private PeriodicMetricReader periodicMetricReader;
+ private PrometheusHttpServer prometheusHttpServer;
+ private LoggingMetricExporter loggingMetricExporter;
+ private Meter bridgeMeter;
+
+ // queue stats metrics
+ public static ObservableLongGauge targetGauge = new NopObservableLongGauge();
+ // invoke timeout
+ public static LongHistogram invokeLatency = new NopLongHistogram();
+ // request metrics
+ public static ObservableLongCounter messagesOutTotal = new NopLongCounter();
+
+ public BridgeMetricsManager() {
+ initMetricsProperties();
+ }
+
+ private void initMetricsProperties() {
+ try {
+ Properties properties = PropertiesLoaderUtils.loadAllProperties("metrics.properties");
+ String metricsPromExporterHost = properties.getProperty("metrics.endpoint.host");
+ String metricsPromExporterPort = properties.getProperty("metrics.endpoint.port");
+ String metricsCollectorMode = properties.getProperty("metrics.collector.mode");
+
+ BridgeConfig bridgeConfig = new BridgeConfig();
+ bridgeConfig.setMetricsPromExporterHost(metricsPromExporterHost);
+ bridgeConfig.setMetricsPromExporterPort(Integer.parseInt(metricsPromExporterPort));
+ bridgeConfig.setMetricsExporterType(Integer.parseInt(metricsCollectorMode));
+ this.bridgeConfig = bridgeConfig;
+ } catch (IOException e) {
+ LOGGER.error("init metrics properties exception, stack trace- ", e);
+ }
+ }
+
+ public static AttributesBuilder newAttributesBuilder(Map labels) {
+ AttributesBuilder attributesBuilder = Attributes.builder();
+ labels.forEach(attributesBuilder::put);
+ return attributesBuilder;
+ }
+
+ private boolean checkConfig() {
+ if (bridgeConfig == null) {
+ return false;
+ }
+ BridgeConfig.MetricsExporterType exporterType = bridgeConfig.getMetricsExporterType();
+ if (!exporterType.isEnable()) {
+ return false;
+ }
+
+ switch (exporterType) {
+ case OTLP_GRPC:
+ return StringUtils.isNotBlank(bridgeConfig.getMetricsGrpcExporterTarget());
+ case PROM:
+ return true;
+ case LOG:
+ return true;
+ }
+ return false;
+ }
+
+ public void init() {
+ BridgeConfig.MetricsExporterType metricsExporterType = bridgeConfig.getMetricsExporterType();
+ if (metricsExporterType == BridgeConfig.MetricsExporterType.DISABLE) {
+ return;
+ }
+
+ if (!checkConfig()) {
+ LOGGER.error("check metrics config failed, will not export metrics");
+ return;
+ }
+
+ SdkMeterProviderBuilder providerBuilder = SdkMeterProvider.builder()
+ .setResource(Resource.empty());
+
+ if (metricsExporterType == BridgeConfig.MetricsExporterType.OTLP_GRPC) {
+ String endpoint = bridgeConfig.getMetricsGrpcExporterTarget();
+ if (!endpoint.startsWith("http")) {
+ endpoint = "https://" + endpoint;
+ }
+ OtlpGrpcMetricExporterBuilder metricExporterBuilder = OtlpGrpcMetricExporter.builder()
+ .setEndpoint(endpoint)
+ .setTimeout(bridgeConfig.getMetricGrpcExporterTimeOutInMills(), TimeUnit.MILLISECONDS)
+ .setAggregationTemporalitySelector(type -> {
+ if (bridgeConfig.isMetricsInDelta() &&
+ (type == InstrumentType.COUNTER || type == InstrumentType.OBSERVABLE_COUNTER || type == InstrumentType.HISTOGRAM)) {
+ return AggregationTemporality.DELTA;
+ }
+ return AggregationTemporality.CUMULATIVE;
+ });
+
+ String headers = bridgeConfig.getMetricsGrpcExporterHeader();
+ if (StringUtils.isNotBlank(headers)) {
+ Map headerMap = new HashMap<>();
+ List kvPairs = Splitter.on(',').omitEmptyStrings().splitToList(headers);
+ for (String item : kvPairs) {
+ String[] split = item.split(":");
+ if (split.length != 2) {
+ LOGGER.warn("metricsGrpcExporterHeader is not valid: {}", headers);
+ continue;
+ }
+ headerMap.put(split[0], split[1]);
+ }
+ headerMap.forEach(metricExporterBuilder::addHeader);
+ }
+
+ metricExporter = metricExporterBuilder.build();
+
+ periodicMetricReader = PeriodicMetricReader.builder(metricExporter)
+ .setInterval(bridgeConfig.getMetricGrpcExporterIntervalInMills(), TimeUnit.MILLISECONDS)
+ .build();
+
+ providerBuilder.registerMetricReader(periodicMetricReader);
+ }
+
+ if (metricsExporterType == BridgeConfig.MetricsExporterType.PROM) {
+ String promExporterHost = bridgeConfig.getMetricsPromExporterHost();
+ if (StringUtils.isBlank(promExporterHost)) {
+ promExporterHost = bridgeConfig.getEventBridgeAddress();
+ }
+ prometheusHttpServer = PrometheusHttpServer.builder()
+ .setHost(promExporterHost)
+ .setPort(bridgeConfig.getMetricsPromExporterPort())
+ .build();
+ providerBuilder.registerMetricReader(prometheusHttpServer);
+ }
+
+ if (metricsExporterType == BridgeConfig.MetricsExporterType.LOG) {
+ SLF4JBridgeHandler.removeHandlersForRootLogger();
+ SLF4JBridgeHandler.install();
+ loggingMetricExporter = LoggingMetricExporter.create(bridgeConfig.isMetricsInDelta() ? AggregationTemporality.DELTA : AggregationTemporality.CUMULATIVE);
+ java.util.logging.Logger.getLogger(LoggingMetricExporter.class.getName()).setLevel(java.util.logging.Level.FINEST);
+ periodicMetricReader = PeriodicMetricReader.builder(loggingMetricExporter)
+ .setInterval(bridgeConfig.getMetricLoggingExporterIntervalInMills(), TimeUnit.MILLISECONDS)
+ .build();
+ providerBuilder.registerMetricReader(periodicMetricReader);
+ }
+
+ registerMetricsView(providerBuilder);
+
+ bridgeMeter = OpenTelemetrySdk.builder()
+ .setMeterProvider(providerBuilder.build())
+ .buildAndRegisterGlobal()
+ .getMeter(OPEN_TELEMETRY_METER_NAME);
+
+ }
+
+ private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) {
+ // message size buckets, 1k, 4k, 512k, 1M, 2M, 4M
+ List messageSizeBuckets = Arrays.asList(
+ 1d * 1024, //1KB
+ 4d * 1024, //4KB
+ 512d * 1024, //512KB
+ 1d * 1024 * 1024, //1MB
+ 2d * 1024 * 1024, //2MB
+ 4d * 1024 * 1024 //4MB
+ );
+ InstrumentSelector messageSizeSelector = InstrumentSelector.builder()
+ .setType(InstrumentType.HISTOGRAM)
+ .setName(HISTOGRAM_MESSAGE_SIZE)
+ .build();
+
+ View messageSizeView = View.builder()
+ .setAggregation(Aggregation.explicitBucketHistogram(messageSizeBuckets))
+ .build();
+ providerBuilder.registerView(messageSizeSelector, messageSizeView);
+
+ for (Pair selectorViewPair : getMetricsView()) {
+ providerBuilder.registerView(selectorViewPair.getObject1(), selectorViewPair.getObject2());
+ }
+ }
+
+ public static List> getMetricsView() {
+ List rpcCostTimeBuckets = Arrays.asList(
+ (double) Duration.ofMillis(1).toMillis(),
+ (double) Duration.ofMillis(3).toMillis(),
+ (double) Duration.ofMillis(5).toMillis(),
+ (double) Duration.ofMillis(7).toMillis(),
+ (double) Duration.ofMillis(10).toMillis(),
+ (double) Duration.ofMillis(100).toMillis(),
+ (double) Duration.ofSeconds(1).toMillis(),
+ (double) Duration.ofSeconds(2).toMillis(),
+ (double) Duration.ofSeconds(3).toMillis()
+ );
+ InstrumentSelector selector = InstrumentSelector.builder()
+ .setType(InstrumentType.HISTOGRAM)
+ .setName(EVENTRULE_TRIGGER_LATENCY)
+ .build();
+ View view = View.builder()
+ .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets))
+ .build();
+ return Lists.newArrayList(new Pair<>(selector, view));
+ }
+
+
+
+ public AttributesBuilder addGroup(Map labels ) {
+ return newAttributesBuilder(labels);
+ }
+
+ private void countMetrics(String metricsName, long count, AttributesBuilder attributesBuilder) {
+ messagesOutTotal = bridgeMeter.counterBuilder(metricsName)
+ .setDescription("Total number of outgoing messages")
+ .buildWithCallback(measurement -> {
+ measurement.record(count, attributesBuilder.build());
+ });
+
+ }
+
+ private void gaugeMetrics(String metricsName, long count, AttributesBuilder attributesBuilder) {
+ targetGauge = bridgeMeter.gaugeBuilder(metricsName)
+ .setDescription("Gauge of total messages ")
+ .ofLongs()
+ .buildWithCallback( measurement -> {
+ measurement.record(count, attributesBuilder.build());
+ });
+ }
+
+ public void eventbusInEventsTotal(String runnerName, String accountId, String status, long count) {
+ Map labelMaps = buildLabelMap(runnerName, accountId, status);
+ AttributesBuilder attributesBuilder = addGroup(labelMaps);
+ countMetrics(EVENTBUS_IN_EVENTS_TOTAL, count, attributesBuilder);
+ }
+
+
+ public void eventruleFilterEventsTotal(String runnerName, String accountId, String status, long count) {
+ Map labelMaps = buildLabelMap(runnerName, accountId, status);
+ AttributesBuilder attributesBuilder = addGroup(labelMaps);
+ countMetrics(EVENTRULE_FILTER_EVENTS_TOTAL, count, attributesBuilder);
+ }
+
+ public void eventRuleLatencySeconds(String runnerName, String accountId ,String status, long latency) {
+ Map labelMaps = buildLabelMap(runnerName, accountId, status);
+ AttributesBuilder attributesBuilder = addGroup(labelMaps);
+ gaugeMetrics(EVENTRULE_LATENCY_SECONDS, latency, attributesBuilder);
+ }
+
+
+ public void countLatencyStat(long latency, String metricsName, String runnerName, String accountId, String status) {
+ invokeLatency = bridgeMeter.histogramBuilder(metricsName)
+ .setDescription("invoke latency")
+ .setUnit("milliseconds")
+ .ofLongs()
+ .build();
+ invokeLatency.record(latency, newAttributesBuilder(buildLabelMap(runnerName, accountId, status)).build());
+ }
+
+
+ public void shutdown() {
+ if (bridgeConfig.getMetricsExporterType() == BridgeConfig.MetricsExporterType.OTLP_GRPC) {
+ periodicMetricReader.forceFlush();
+ periodicMetricReader.shutdown();
+ metricExporter.shutdown();
+ }
+ if (bridgeConfig.getMetricsExporterType() == BridgeConfig.MetricsExporterType.PROM) {
+ prometheusHttpServer.forceFlush();
+ prometheusHttpServer.shutdown();
+ }
+ if (bridgeConfig.getMetricsExporterType() == BridgeConfig.MetricsExporterType.LOG) {
+ periodicMetricReader.forceFlush();
+ periodicMetricReader.shutdown();
+ loggingMetricExporter.shutdown();
+ }
+ }
+
+ private Map buildLabelMap(String runnerName, String accountId, String status) {
+ Map labelMap = new HashMap<>();
+ if (StringUtils.isNotBlank(runnerName)) {
+ labelMap.put("runner_name", runnerName);
+ }
+
+ if (StringUtils.isNotBlank(accountId)) {
+
+ labelMap.put("account_id", accountId);
+ }
+ if (StringUtils.isNotBlank(status)) {
+ labelMap.put("status", status);
+ }
+ return labelMap;
+ }
+}
diff --git a/metrics/src/main/java/org/apache/rocketmq/eventbridge/Pair.java b/metrics/src/main/java/org/apache/rocketmq/eventbridge/Pair.java
new file mode 100644
index 00000000..4afe22e4
--- /dev/null
+++ b/metrics/src/main/java/org/apache/rocketmq/eventbridge/Pair.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge;
+
+public class Pair {
+ private T1 object1;
+ private T2 object2;
+
+ public Pair(T1 object1, T2 object2) {
+ this.object1 = object1;
+ this.object2 = object2;
+ }
+
+ public T1 getObject1() {
+ return object1;
+ }
+
+ public void setObject1(T1 object1) {
+ this.object1 = object1;
+ }
+
+ public T2 getObject2() {
+ return object2;
+ }
+
+ public void setObject2(T2 object2) {
+ this.object2 = object2;
+ }
+}
diff --git a/metrics/src/main/resources/metrics.properties b/metrics/src/main/resources/metrics.properties
new file mode 100644
index 00000000..1b9bd52a
--- /dev/null
+++ b/metrics/src/main/resources/metrics.properties
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+## metrics
+metrics.endpoint.host=127.0.0.1
+metrics.endpoint.port=19090
+metrics.collector.mode=2
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index f3d1bcbf..ed9d4bf9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -106,6 +106,7 @@
common
infrastructure
test
+ metrics
diff --git a/start/src/main/resources/application.properties b/start/src/main/resources/application.properties
index 5911b302..37a0a5e2 100644
--- a/start/src/main/resources/application.properties
+++ b/start/src/main/resources/application.properties
@@ -34,7 +34,6 @@ runtime.storage.mode=ROCKETMQ
rumtime.name=eventbridge-runtimer
runtime.pluginpath=~/eventbridge/plugin
-
## log
app.name=rocketmqeventbridge
log.level=INFO