diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java index 477e4b4e..2644308b 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java @@ -17,7 +17,6 @@ package org.apache.rocketmq.eventbridge.adapter.runtime.boot; -import com.alibaba.fastjson.JSON; import com.google.common.collect.Lists; import io.openmessaging.connector.api.data.ConnectRecord; import java.util.List; @@ -25,6 +24,7 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicReference; import javax.annotation.PostConstruct; import org.apache.commons.collections.MapUtils; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorContext; @@ -33,6 +33,8 @@ import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread; import org.apache.rocketmq.eventbridge.adapter.runtime.error.ErrorHandler; import org.apache.rocketmq.eventbridge.adapter.runtime.utils.ExceptionUtil; +import org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant; +import org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,9 +92,11 @@ public void run() { TransformEngine curTransformEngine = latestTransformMap.get(runnerName); List curEventRecords = eventRecordMap.get(runnerName); curEventRecords.forEach(pullRecord -> { + AtomicReference resultException = new AtomicReference<>(); CompletableFuture transformFuture = CompletableFuture.supplyAsync(() -> curTransformEngine.doTransforms(pullRecord)) .exceptionally((exception) -> { LOGGER.error("transfer do transform event record failed, stackTrace-", exception); + resultException.set(exception); errorHandler.handle(pullRecord, exception); return null; }) @@ -103,6 +107,7 @@ public void run() { offsetManager.commit(pullRecord); } }); + exportMetrics(pullRecord, runnerName, resultException); completableFutures.add(transformFuture); }); } @@ -117,6 +122,33 @@ public void run() { } } + private static void exportMetrics(ConnectRecord connectRecord, String ruleName, AtomicReference resultException) { + String status = "success"; + if (resultException.get() != null) { + status = "failed"; + resultException.set(null); + } + EventBridgeMetricsManager.observableDoubleGauge.set(1D, + EventBridgeMetricsManager.newAttributesBuilder() + .put(EventBridgeMetricsConstant.LABEL_STATUS, status) + .put(EventBridgeMetricsConstant.LABEL_EVENT_BUS_NAME, connectRecord.getExtension("eventbusname")) + .put(EventBridgeMetricsConstant.LABEL_ACCOUNT_ID, connectRecord.getExtension("id")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_SOURCE, connectRecord.getExtension("source")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_TYPE, connectRecord.getExtension("type")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_RULE_NAME, ruleName).build()); + + EventBridgeMetricsManager.eventbridgeEventRuleLagEventsTotal.inc(1L, + EventBridgeMetricsManager.newAttributesBuilder() + .put(EventBridgeMetricsConstant.LABEL_STATUS, status) + .put(EventBridgeMetricsConstant.LABEL_EVENT_BUS_NAME, connectRecord.getExtension("eventbusname")) + .put(EventBridgeMetricsConstant.LABEL_ACCOUNT_ID, connectRecord.getExtension("id")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_SOURCE, connectRecord.getExtension("source")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_TYPE, connectRecord.getExtension("type")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_RULE_NAME, ruleName).build()); + } + + + @Override public void start() { thread.start(); diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java index b9d175b9..fcda2112 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java @@ -31,6 +31,8 @@ import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread; import org.apache.rocketmq.eventbridge.adapter.runtime.error.ErrorHandler; import org.apache.rocketmq.eventbridge.adapter.runtime.utils.ExceptionUtil; +import org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant; +import org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,15 +76,39 @@ public void run() { try { sinkTask.put(triggerRecords); offsetManager.commit(triggerRecords); + exportMetrics(triggerRecords.get(0), "success"); } catch (Exception exception) { LOGGER.error(getServiceName() + " push target exception, stackTrace-", exception); triggerRecords.forEach(triggerRecord -> errorHandler.handle(triggerRecord, exception)); + exportMetrics(triggerRecords.get(0), "failed"); } }); } } } + private static void exportMetrics(ConnectRecord connectRecord, String status) { + EventBridgeMetricsManager.eventbridgeEventsTriggerLatency.update(1D, + EventBridgeMetricsManager.newAttributesBuilder() + .put(EventBridgeMetricsConstant.LABEL_STATUS, "success") + .put(EventBridgeMetricsConstant.LABEL_EVENT_BUS_NAME, connectRecord.getExtension("eventbusname")) + .put(EventBridgeMetricsConstant.LABEL_ACCOUNT_ID, connectRecord.getExtension("id")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_SOURCE, connectRecord.getExtension("source")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_TYPE, connectRecord.getExtension("type")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_RULE_NAME, connectRecord.getExtension("runner-name")) + .put(EventBridgeMetricsConstant.LABEL_TRANSFORM_TYPE, "filter").build()); + + EventBridgeMetricsManager.eventbridgeEventsLatency.set(1D, + EventBridgeMetricsManager.newAttributesBuilder() + .put(EventBridgeMetricsConstant.LABEL_STATUS, "success") + .put(EventBridgeMetricsConstant.LABEL_EVENT_BUS_NAME, connectRecord.getExtension("eventbusname")) + .put(EventBridgeMetricsConstant.LABEL_ACCOUNT_ID, connectRecord.getExtension("id")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_SOURCE, connectRecord.getExtension("source")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_TYPE, connectRecord.getExtension("type")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_RULE_NAME, connectRecord.getExtension("runner-name")) + .put(EventBridgeMetricsConstant.LABEL_TRANSFORM_TYPE, "filter").build()); + } + @Override public String getServiceName() { return EventTargetTrigger.class.getSimpleName(); diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/DefaultSendCallback.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/DefaultSendCallback.java index 35bc7ef8..892aa6b5 100644 --- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/DefaultSendCallback.java +++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/DefaultSendCallback.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.eventbridge.adapter.storage.rocketmq.impl; +import lombok.Getter; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.eventbridge.domain.model.data.PutEventCallback; @@ -29,6 +30,9 @@ public class DefaultSendCallback implements SendCallback { PutEventsResponseEntry entry = new PutEventsResponseEntry(); + @Getter + private String status; + public DefaultSendCallback(PutEventCallback putEventCallback) { this.putEventCallback = putEventCallback; } @@ -37,6 +41,7 @@ public DefaultSendCallback(PutEventCallback putEventCallback) { public void onSuccess(SendResult sendResult) { entry.setEventId(sendResult.getMsgId()); entry.setErrorCode(DefaultErrorCode.Success.getCode()); + status = DefaultErrorCode.Success.getCode(); putEventCallback.endProcess(entry); } @@ -44,6 +49,7 @@ public void onSuccess(SendResult sendResult) { public void onException(Throwable throwable) { entry.setErrorCode(DefaultErrorCode.InternalError.getCode()); entry.setErrorMessage(throwable.getMessage()); + status = DefaultErrorCode.InternalError.getCode(); putEventCallback.endProcess(entry); } } diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java index 5b1d796c..3e8b12b3 100644 --- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java +++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java @@ -30,6 +30,8 @@ import org.apache.rocketmq.eventbridge.domain.storage.EventDataRepository; import org.apache.rocketmq.eventbridge.event.EventBridgeEvent; import org.apache.rocketmq.eventbridge.exception.EventBridgeException; +import org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant; +import org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsManager; import org.springframework.beans.factory.annotation.Value; import org.springframework.cache.annotation.Cacheable; import org.springframework.stereotype.Repository; @@ -74,14 +76,34 @@ public boolean putEvent(String accountId, String eventBusName, EventBridgeEvent PutEventCallback putEventCallback) { String topicName = this.getTopicName(accountId, eventBusName); Message msg = eventDataOnRocketMQConnectAPI.converter(accountId, topicName, eventBridgeEvent); + DefaultSendCallback sendCallback = new DefaultSendCallback(putEventCallback); try { - producer.send(msg, new DefaultSendCallback(putEventCallback), 1000L); + producer.send(msg, sendCallback, 1000L); + exportMetrics(accountId, eventBusName, eventBridgeEvent, sendCallback); } catch (Throwable e) { throw new EventBridgeException(EventBridgeErrorCode.InternalError, e); } return true; } + private static void exportMetrics(String accountId, String eventBusName, EventBridgeEvent eventBridgeEvent, DefaultSendCallback sendCallback) { + EventBridgeMetricsManager.eventbridgePutEventsLatency.update(1D, + EventBridgeMetricsManager.newAttributesBuilder() + .put(EventBridgeMetricsConstant.LABEL_STATUS, sendCallback.getStatus()) + .put(EventBridgeMetricsConstant.LABEL_EVENT_BUS_NAME, eventBusName) + .put(EventBridgeMetricsConstant.LABEL_ACCOUNT_ID, accountId) + .put(EventBridgeMetricsConstant.LABEL_EVENT_SOURCE, eventBridgeEvent.getSource().toString()) + .put(EventBridgeMetricsConstant.LABEL_EVENT_TYPE, eventBridgeEvent.getType()).build()); + + EventBridgeMetricsManager.eventbridgePutEventsSize.update(Double.valueOf(String.valueOf(eventBridgeEvent.getData().length)), + EventBridgeMetricsManager.newAttributesBuilder() + .put(EventBridgeMetricsConstant.LABEL_STATUS, sendCallback.getStatus()) + .put(EventBridgeMetricsConstant.LABEL_EVENT_BUS_NAME, eventBusName) + .put(EventBridgeMetricsConstant.LABEL_ACCOUNT_ID, accountId) + .put(EventBridgeMetricsConstant.LABEL_EVENT_SOURCE, eventBridgeEvent.getSource().toString()) + .put(EventBridgeMetricsConstant.LABEL_EVENT_TYPE, eventBridgeEvent.getType()).build()); + } + @Override public String getEventBusPersistentContext(String accountId, String eventBusName) { EventTopicDO eventTopicDO = eventTopicMapper.getTopic(accountId, eventBusName); diff --git a/domain/pom.xml b/domain/pom.xml index d0724819..735d529d 100644 --- a/domain/pom.xml +++ b/domain/pom.xml @@ -23,7 +23,7 @@ org.apache.rocketmq - rocketmq-eventbridge-common + rocketmq-eventbridge-infrastructure diff --git a/infrastructure/pom.xml b/infrastructure/pom.xml index 661455d9..a50a9d91 100644 --- a/infrastructure/pom.xml +++ b/infrastructure/pom.xml @@ -33,6 +33,33 @@ org.springframework.boot spring-boot-starter-webflux + + io.opentelemetry + opentelemetry-sdk + + + io.opentelemetry + opentelemetry-exporter-otlp + + + + io.opentelemetry + opentelemetry-exporter-logging + + + + io.opentelemetry + opentelemetry-exporter-prometheus + + + + io.opentelemetry + opentelemetry-exporter-logging-otlp + + + io.prometheus + simpleclient + \ No newline at end of file diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/Counter.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/Counter.java new file mode 100644 index 00000000..5bdc69ec --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/Counter.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.rocketmq.eventbridge.infrastructure.metric; + + +public interface Counter extends Metric { + + default void inc(P2 attachment){} + + default void inc(P1 n, P2 attachment){} + + @Override + default MetricType getMetricType() { + return MetricType.COUNTER; + } +} diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/DoubleGauge.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/DoubleGauge.java new file mode 100644 index 00000000..894bdacf --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/DoubleGauge.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.infrastructure.metric; + +import io.opentelemetry.api.common.Attributes; +import org.apache.rocketmq.eventbridge.infrastructure.metric.otlp.NopDoubleGauge; + + +public class DoubleGauge implements Gauge { + + private io.opentelemetry.api.metrics.DoubleGauge doubleGauge = new NopDoubleGauge(); + + private final String metricName; + + public DoubleGauge(String metricName) { + this.metricName = metricName; + } + + @Override + public void set(Double value, Attributes attachment) { + doubleGauge.set(value, attachment); + } + + @Override + public io.opentelemetry.api.metrics.DoubleGauge getValue() { + return doubleGauge; + } + + @Override + public String getMetricName() { + return this.metricName; + } + + @Override + public void setInstrument(io.opentelemetry.api.metrics.DoubleGauge instrument) { + this.doubleGauge = instrument; + } +} diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/DoubleHistogram.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/DoubleHistogram.java new file mode 100644 index 00000000..e595ec1f --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/DoubleHistogram.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.infrastructure.metric; + +import io.opentelemetry.api.common.Attributes; +import org.apache.rocketmq.eventbridge.infrastructure.metric.otlp.NopDoubleHistogram; + +public class DoubleHistogram implements Histogram { + + private io.opentelemetry.api.metrics.DoubleHistogram doubleHistogram = new NopDoubleHistogram(); + + private final String metricName; + + public DoubleHistogram(String metricName) { + this.metricName = metricName; + } + + @Override + public void update(Double value, Attributes attachment) { + doubleHistogram.record(value, attachment); + } + + @Override + public io.opentelemetry.api.metrics.DoubleHistogram getValue() { + return doubleHistogram; + } + + @Override + public String getMetricName() { + return this.metricName; + } + + @Override + public void setInstrument(io.opentelemetry.api.metrics.DoubleHistogram instrument) { + this.doubleHistogram = instrument; + } +} diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/DoubleObserverGauge.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/DoubleObserverGauge.java new file mode 100644 index 00000000..cb8b0885 --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/DoubleObserverGauge.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.eventbridge.infrastructure.metric; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.ObservableDoubleMeasurement; +import org.apache.rocketmq.eventbridge.infrastructure.metric.otlp.NopObservableDoubleGauge; + +public class DoubleObserverGauge implements ObservableGauge { + + private io.opentelemetry.api.metrics.ObservableDoubleMeasurement observableDoubleGauge = new NopObservableDoubleGauge(); + + private final String metricName; + + public DoubleObserverGauge(String metricName) { + this.metricName = metricName; + } + + + @Override + public void set(Double value, Attributes attachment) { + observableDoubleGauge.record(value, attachment); + } + + @Override + public ObservableDoubleMeasurement getValue() { + return observableDoubleGauge; + } + + + @Override + public String getMetricName() { + return this.metricName; + } + + @Override + public void setInstrument(ObservableDoubleMeasurement instrument) { + this.observableDoubleGauge = instrument; + } +} \ No newline at end of file diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/EventBridgeMetricsConstant.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/EventBridgeMetricsConstant.java new file mode 100644 index 00000000..8d74de26 --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/EventBridgeMetricsConstant.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.eventbridge.infrastructure.metric; + +public class EventBridgeMetricsConstant { + + public static final String HISTOGRAM_EVENTBRIDGE_PUTEVENTS_LATENCY = "eventbridge_putevents_latency"; + + public static final String HISTOGRAM_EVENTBRIDGE_PUTEVENTS_SIZE = "eventbridge_putevents_size"; + + public static final String GAUGE_EVENTBRIDGE_EVENT_RULE_LATENCY = "eventbridge_event_rule_latency"; + + public static final String COUNTER_EVENTBRIDGE_EVENT_RULE_LAG_EVENTS_TOTAL = "eventbridge_event_rule_lag_events_total"; + + public static final String COUNTER_EVENTBRIDGE_EVENTS_TRANSFER_IN_TOTAL = "eventbridge_events_transfer_in_total"; + + public static final String COUNTER_EVENTBRIDGE_EVENTS_TRANSFER_OUT_TOTAL = "eventbridge_events_transfer_out_total"; + + public static final String GAUGE_EVENTBRIDGE_EVENTS_TRANSFER_LATENCY = "eventbridge_events_transfer_latency"; + + public static final String HISTOGRAM_EVENTBRIDGE_EVENTS_TRIGGER_LATENCY = "eventbridge_events_trigger_latency"; + + public static final String GAUGE_EVENTBRIDGE_EVENTS_LATENCY = "eventbridge_events_latency"; + + //状态 + public static final String LABEL_STATUS = "status"; + + //名称 + public static final String LABEL_EVENT_BUS_NAME = "event_bus_name"; + + //账户Id + public static final String LABEL_ACCOUNT_ID = "accountId"; + + //件源 + public static final String LABEL_EVENT_SOURCE = "event_source"; + + //事件类型 + public static final String LABEL_EVENT_TYPE = "event_type"; + + //规则名称 + public static final String LABEL_EVENT_RULE_NAME = "event_rule_name"; + + //转换类型 + public static final String LABEL_TRANSFORM_TYPE = "transform_type"; + + //目标名称 + public static final String LABEL_EVENT_TARGET_NAME = "event_target_name"; + + //目标类型 + public static final String LABEL_EVENT_TARGET_TYPE = "event_target_type"; +} diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/EventBridgeMetricsManager.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/EventBridgeMetricsManager.java new file mode 100644 index 00000000..988a0b00 --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/EventBridgeMetricsManager.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.eventbridge.infrastructure.metric; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.metrics.Meter; +import lombok.experimental.UtilityClass; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +import static org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant.COUNTER_EVENTBRIDGE_EVENTS_TRANSFER_IN_TOTAL; +import static org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant.COUNTER_EVENTBRIDGE_EVENTS_TRANSFER_OUT_TOTAL; +import static org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant.COUNTER_EVENTBRIDGE_EVENT_RULE_LAG_EVENTS_TOTAL; +import static org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant.GAUGE_EVENTBRIDGE_EVENTS_LATENCY; +import static org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant.GAUGE_EVENTBRIDGE_EVENTS_TRANSFER_LATENCY; +import static org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant.GAUGE_EVENTBRIDGE_EVENT_RULE_LATENCY; +import static org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant.HISTOGRAM_EVENTBRIDGE_EVENTS_TRIGGER_LATENCY; +import static org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant.HISTOGRAM_EVENTBRIDGE_PUTEVENTS_LATENCY; +import static org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant.HISTOGRAM_EVENTBRIDGE_PUTEVENTS_SIZE; + +@UtilityClass +public class EventBridgeMetricsManager { + + public static Supplier attributesBuilderSupplier = Attributes::builder; + + public final static List metrics = new ArrayList<>(); + + public static Histogram eventbridgePutEventsLatency = new DoubleHistogram(HISTOGRAM_EVENTBRIDGE_PUTEVENTS_LATENCY); + + public static Histogram eventbridgePutEventsSize = new DoubleHistogram(HISTOGRAM_EVENTBRIDGE_PUTEVENTS_SIZE); + + public static ObservableGauge observableDoubleGauge = new DoubleObserverGauge(GAUGE_EVENTBRIDGE_EVENT_RULE_LATENCY); + + public static Counter eventbridgeEventRuleLagEventsTotal = new LongCounter(COUNTER_EVENTBRIDGE_EVENT_RULE_LAG_EVENTS_TOTAL); + + public static Counter eventbridgeEventsTransferInTotal = new LongCounter(COUNTER_EVENTBRIDGE_EVENTS_TRANSFER_IN_TOTAL); + + public static Counter eventbridgeEventsTransferOutTotal = new LongCounter(COUNTER_EVENTBRIDGE_EVENTS_TRANSFER_OUT_TOTAL); + + public static ObservableGauge eventbridgeEventsTransferLatency = new DoubleObserverGauge(GAUGE_EVENTBRIDGE_EVENTS_TRANSFER_LATENCY); + + public static Histogram eventbridgeEventsTriggerLatency = new DoubleHistogram(HISTOGRAM_EVENTBRIDGE_EVENTS_TRIGGER_LATENCY); + + public static ObservableGauge eventbridgeEventsLatency = new DoubleObserverGauge(GAUGE_EVENTBRIDGE_EVENTS_LATENCY); + + private final static Map LABEL_MAP = new HashMap<>(); + + static { + metrics.add(eventbridgePutEventsLatency); + metrics.add(eventbridgePutEventsSize); + metrics.add(observableDoubleGauge); + metrics.add(eventbridgeEventRuleLagEventsTotal); + metrics.add(eventbridgeEventsTransferInTotal); + metrics.add(eventbridgeEventsTransferOutTotal); + metrics.add(eventbridgeEventsTransferLatency); + metrics.add(eventbridgeEventsTriggerLatency); + metrics.add(eventbridgeEventsLatency); + } + + public static AttributesBuilder newAttributesBuilder() { + AttributesBuilder attributesBuilder; + if (attributesBuilderSupplier == null) { + attributesBuilderSupplier = Attributes::builder; + } + attributesBuilder = attributesBuilderSupplier.get(); + LABEL_MAP.forEach(attributesBuilder::put); + return attributesBuilder; + } + + public static void initMetricsView(Meter brokerMeter) { + for(Metric metric : metrics) { + if (metric instanceof Counter) { + metric.setInstrument(brokerMeter.counterBuilder(metric.getMetricName()).build());; + } else if (metric instanceof ObservableCounter) { + metric.setInstrument(brokerMeter.counterBuilder(metric.getMetricName()).buildObserver()); + } else if (metric instanceof Histogram) { + metric.setInstrument(brokerMeter.histogramBuilder(metric.getMetricName()).build()); + } else if (metric instanceof Gauge) { + metric.setInstrument(brokerMeter.gaugeBuilder(metric.getMetricName()).build()); + } else if (metric instanceof ObservableGauge) { + metric.setInstrument(brokerMeter.gaugeBuilder(metric.getMetricName()).buildObserver()); + } + } + + } + +} diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/Gauge.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/Gauge.java new file mode 100644 index 00000000..bbac4b99 --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/Gauge.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.infrastructure.metric; + + +public interface Gauge extends Metric { + + default N getValue(){return null;} + + void set(P1 l, P2 attributes); + + @Override + default MetricType getMetricType() { + return MetricType.GAUGE; + } +} diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/Histogram.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/Histogram.java new file mode 100644 index 00000000..35477396 --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/Histogram.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.infrastructure.metric; + +public interface Histogram extends Metric { + + default void update(P1 value, P2 attachment){} + + default R getValue(){return null;} + + @Override + default MetricType getMetricType() { + return MetricType.HISTOGRAM; + } +} diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/LongCounter.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/LongCounter.java new file mode 100644 index 00000000..eebe6443 --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/LongCounter.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.infrastructure.metric; + +import io.opentelemetry.api.common.Attributes; +import org.apache.rocketmq.eventbridge.infrastructure.metric.otlp.NopLongCounter; + +public class LongCounter implements Counter { + + private io.opentelemetry.api.metrics.LongCounter longCounter = new NopLongCounter(); + + private final String metricName; + + public LongCounter(String metricName) { + this.metricName = metricName; + } + + + @Override + public void inc(Attributes attachment) { + longCounter.add(1, attachment); + } + + @Override + public void inc(Long n, Attributes attachment) { + longCounter.add(n, attachment); + } + + @Override + public String getMetricName() { + return this.metricName; + } + + @Override + public void setInstrument(io.opentelemetry.api.metrics.LongCounter instrument) { + this.longCounter = instrument; + } +} diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/LongObserverCounter.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/LongObserverCounter.java new file mode 100644 index 00000000..5f65862d --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/LongObserverCounter.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.infrastructure.metric; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import org.apache.rocketmq.eventbridge.infrastructure.metric.otlp.NopObservableLongCounter; + +public class LongObserverCounter implements ObservableCounter { + + private io.opentelemetry.api.metrics.ObservableLongMeasurement observableLongCounter = new NopObservableLongCounter(); + + private final String metricName; + + public LongObserverCounter(String metricName) { + this.metricName = metricName; + } + + @Override + public void inc(Attributes attachment) { + observableLongCounter.record(1, attachment); + } + + @Override + public void inc(Long n, Attributes attachment) { + observableLongCounter.record(n, attachment); + } + + @Override + public String getMetricName() { + return this.metricName; + } + + @Override + public void setInstrument(ObservableLongMeasurement instrument) { + this.observableLongCounter = instrument; + } +} diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/Metric.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/Metric.java new file mode 100644 index 00000000..b532b782 --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/Metric.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.infrastructure.metric; + +public interface Metric { + default MetricType getMetricType() { + throw new UnsupportedOperationException("Custom metric type is not supported."); + } + + String getMetricName(); + + void setInstrument(R instrument); +} diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricConfig.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricConfig.java new file mode 100644 index 00000000..c613c14e --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricConfig.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.eventbridge.infrastructure.metric; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Data +@ConfigurationProperties(prefix="metrics") +@EnableConfigurationProperties +@Configuration +public class MetricConfig { + + private MetricsExporterType metricsExporterType = MetricsExporterType.DISABLE; + + private String labels; + + private boolean inDelta = false; + + private int otelCardinalityLimit = 50 * 1000; + + private String grpcExporterTarget = ""; + + private String grpcExporterHeader = ""; + + private long grpcExporterTimeOutInMills = 3 * 1000; + + private long grpcExporterIntervalInMills = 60 * 1000; + + private long loggingExporterIntervalInMills = 10 * 1000; + + private int metricsOtelCardinalityLimit = 50 * 1000; + + private int promExporterPort = 5557; + + private String promExporterHost = "localhost"; +} diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MonitorFactory.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricType.java similarity index 51% rename from infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MonitorFactory.java rename to infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricType.java index 6311d2a9..a01b88b3 100644 --- a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MonitorFactory.java +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricType.java @@ -8,24 +8,18 @@ * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.eventbridge.infrastructure.metric; -import java.util.Map; - -public class MonitorFactory { - - public void createSpan(Map content){ - - } - - public void finishSpan(Map content){ - - } +public enum MetricType { + COUNTER, + METER, + GAUGE, + HISTOGRAM } diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricsCollectorFactory.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricsCollectorFactory.java new file mode 100644 index 00000000..15868480 --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricsCollectorFactory.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.infrastructure.metric; + +import com.google.common.base.Splitter; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.exporter.logging.otlp.OtlpJsonLoggingMetricExporter; +import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter; +import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder; +import io.opentelemetry.exporter.prometheus.PrometheusHttpServer; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.Aggregation; +import io.opentelemetry.sdk.metrics.InstrumentSelector; +import io.opentelemetry.sdk.metrics.InstrumentType; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; +import io.opentelemetry.sdk.metrics.View; +import io.opentelemetry.sdk.metrics.ViewBuilder; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; +import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil; +import io.opentelemetry.sdk.resources.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.bridge.SLF4JBridgeHandler; + +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +import static org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant.HISTOGRAM_EVENTBRIDGE_EVENTS_TRIGGER_LATENCY; +import static org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant.HISTOGRAM_EVENTBRIDGE_PUTEVENTS_LATENCY; +import static org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant.HISTOGRAM_EVENTBRIDGE_PUTEVENTS_SIZE; + +@Slf4j +public class MetricsCollectorFactory { + private static final String OPEN_TELEMETRY_METER_NAME = "eventbridge-meter"; + + private static class MetricsCollectorHolder{ + static MetricsCollectorFactory instance = new MetricsCollectorFactory(); + } + + public static MetricsCollectorFactory getInstance(){ + return MetricsCollectorFactory.MetricsCollectorHolder.instance; + } + + + private MetricsCollectorFactory() { + + } + + public void start(MetricConfig metricConfig) { + + if (!checkConfig(metricConfig)) { + log.error("check metrics config failed, will not export metrics"); + return; + } + + MetricsExporterType metricsExporterType = metricConfig.getMetricsExporterType(); + if (metricsExporterType == MetricsExporterType.DISABLE) { + return; + } + + SdkMeterProviderBuilder providerBuilder = SdkMeterProvider.builder() + .setResource(Resource.empty()); + PeriodicMetricReader periodicMetricReader; + if (metricsExporterType == MetricsExporterType.OTLP_GRPC) { + String endpoint = metricConfig.getGrpcExporterTarget(); + if (!endpoint.startsWith("http")) { + endpoint = "https://" + endpoint; + } + OtlpGrpcMetricExporterBuilder metricExporterBuilder = OtlpGrpcMetricExporter.builder() + .setEndpoint(endpoint) + .setTimeout(metricConfig.getGrpcExporterTimeOutInMills(), TimeUnit.MILLISECONDS) + .setAggregationTemporalitySelector(type -> { + if (metricConfig.isInDelta() && + (type == InstrumentType.COUNTER || type == InstrumentType.OBSERVABLE_COUNTER || type == InstrumentType.HISTOGRAM)) { + return AggregationTemporality.DELTA; + } + return AggregationTemporality.CUMULATIVE; + }); + + String headers = metricConfig.getGrpcExporterHeader(); + if (StringUtils.isNotBlank(headers)) { + Map headerMap = new HashMap<>(); + List kvPairs = Splitter.on(',').omitEmptyStrings().splitToList(headers); + for (String item : kvPairs) { + String[] split = item.split(":"); + if (split.length != 2) { + log.warn("metricsGrpcExporterHeader is not valid: {}", headers); + continue; + } + headerMap.put(split[0], split[1]); + } + headerMap.forEach(metricExporterBuilder::addHeader); + } + + OtlpGrpcMetricExporter metricExporter = metricExporterBuilder.build(); + + periodicMetricReader = PeriodicMetricReader.builder(metricExporter) + .setInterval(metricConfig.getGrpcExporterIntervalInMills(), TimeUnit.MILLISECONDS) + .build(); + + providerBuilder.registerMetricReader(periodicMetricReader); + } + PrometheusHttpServer prometheusHttpServer; + if (metricsExporterType == MetricsExporterType.PROM) { + String promExporterHost = metricConfig.getPromExporterHost(); + prometheusHttpServer = PrometheusHttpServer.builder() + .setHost(promExporterHost) + .setPort(metricConfig.getPromExporterPort()) + .build(); + providerBuilder.registerMetricReader(prometheusHttpServer); + } + + MetricExporter loggingMetricExporter; + if (metricsExporterType == MetricsExporterType.LOG) { + SLF4JBridgeHandler.removeHandlersForRootLogger(); + SLF4JBridgeHandler.install(); + loggingMetricExporter = OtlpJsonLoggingMetricExporter.create(metricConfig.isInDelta() ? AggregationTemporality.DELTA : AggregationTemporality.CUMULATIVE); + Logger.getLogger(OtlpJsonLoggingMetricExporter.class.getName()).setLevel(java.util.logging.Level.FINEST); + periodicMetricReader = PeriodicMetricReader.builder(loggingMetricExporter) + .setInterval(metricConfig.getLoggingExporterIntervalInMills(), TimeUnit.MILLISECONDS) + .build(); + providerBuilder.registerMetricReader(periodicMetricReader); + } + + registerMetricsView(providerBuilder, metricConfig); + + Meter brokerMeter = OpenTelemetrySdk.builder() + .setMeterProvider(providerBuilder.build()) + .build() + .getMeter(OPEN_TELEMETRY_METER_NAME); + EventBridgeMetricsManager.initMetricsView(brokerMeter); + } + + public static boolean checkConfig(MetricConfig metricConfig) { + if (metricConfig == null) { + return false; + } + MetricsExporterType exporterType = metricConfig.getMetricsExporterType(); + if (!exporterType.isEnable()) { + return false; + } + + switch (exporterType) { + case OTLP_GRPC: + return StringUtils.isNotBlank(metricConfig.getGrpcExporterTarget()); + case PROM: + return true; + case LOG: + return true; + } + return false; + } + + private void registerMetricsView(SdkMeterProviderBuilder providerBuilder, MetricConfig metricConfig) { + + //putevents latency buckets + List puteventsLatencyBuckets = Arrays.asList( + (double) Duration.ofMillis(1).toMillis(), + (double) Duration.ofMillis(5).toMillis(), + (double) Duration.ofMillis(20).toMillis(), + (double) Duration.ofMillis(100).toMillis(), + (double) Duration.ofMillis(1000).toMillis(), + (double) Duration.ofMillis(5000).toMillis(), + (double) Duration.ofSeconds(10000).toMillis() + ); + InstrumentSelector puteventsLatencyBucketsSelector = InstrumentSelector.builder() + .setType(InstrumentType.HISTOGRAM) + .setName(HISTOGRAM_EVENTBRIDGE_PUTEVENTS_LATENCY) + .build(); + ViewBuilder puteventsLatencyBucketsView = View.builder() + .setAggregation(Aggregation.explicitBucketHistogram(puteventsLatencyBuckets)); + SdkMeterProviderUtil.setCardinalityLimit(puteventsLatencyBucketsView, metricConfig.getMetricsOtelCardinalityLimit()); + providerBuilder.registerView(puteventsLatencyBucketsSelector, puteventsLatencyBucketsView.build()); + + + // message size buckets, 1k, 4k, 512k, 1M, 2M, 4M + List messageSizeBuckets = Arrays.asList( + 1d * 1024, //1KB + 4d * 1024, //4KB + 512d * 1024, //512KB + 1d * 1024 * 1024, //1MB + 2d * 1024 * 1024, //2MB + 4d * 1024 * 1024 //4MB + ); + + InstrumentSelector messageSizeSelector = InstrumentSelector.builder() + .setType(InstrumentType.HISTOGRAM) + .setName(HISTOGRAM_EVENTBRIDGE_PUTEVENTS_SIZE) + .build(); + + ViewBuilder messageSizeView = View.builder() + .setAggregation(Aggregation.explicitBucketHistogram(messageSizeBuckets)); + SdkMeterProviderUtil.setCardinalityLimit(messageSizeView, metricConfig.getMetricsOtelCardinalityLimit()); + providerBuilder.registerView(messageSizeSelector, messageSizeView.build()); + + //events trigger latency + List eventsTriggerLatencyBuckets = Arrays.asList( + (double) Duration.ofMillis(1).toMillis(), + (double) Duration.ofMillis(5).toMillis(), + (double) Duration.ofMillis(20).toMillis(), + (double) Duration.ofMillis(100).toMillis(), + (double) Duration.ofMillis(1000).toMillis(), + (double) Duration.ofMillis(5000).toMillis(), + (double) Duration.ofSeconds(10000).toMillis() + ); + InstrumentSelector eventsTriggerLatencySelector = InstrumentSelector.builder() + .setType(InstrumentType.HISTOGRAM) + .setName(HISTOGRAM_EVENTBRIDGE_EVENTS_TRIGGER_LATENCY) + .build(); + ViewBuilder eventsTriggerLatencyView = View.builder() + .setAggregation(Aggregation.explicitBucketHistogram(eventsTriggerLatencyBuckets)); + SdkMeterProviderUtil.setCardinalityLimit(eventsTriggerLatencyView, metricConfig.getMetricsOtelCardinalityLimit()); + providerBuilder.registerView(eventsTriggerLatencySelector, eventsTriggerLatencyView.build()); + } + +} diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricsExporterType.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricsExporterType.java new file mode 100644 index 00000000..5e071579 --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricsExporterType.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.infrastructure.metric; + +public enum MetricsExporterType { + DISABLE(0), + OTLP_GRPC(1), + PROM(2), + LOG(3); + + private final int value; + + MetricsExporterType(int value) { + this.value = value; + } + + public int getValue() { + return value; + } + + public static MetricsExporterType valueOf(int value) { + switch (value) { + case 1: + return OTLP_GRPC; + case 2: + return PROM; + case 3: + return LOG; + default: + return DISABLE; + } + } + + public boolean isEnable() { + return this.value > 0; + } +} \ No newline at end of file diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/ObservableCounter.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/ObservableCounter.java new file mode 100644 index 00000000..77a19c86 --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/ObservableCounter.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.rocketmq.eventbridge.infrastructure.metric; + + +public interface ObservableCounter extends Metric { + + default void inc(P2 attachment){} + + default void inc(P1 n, P2 attachment){} + + @Override + default MetricType getMetricType() { + return MetricType.COUNTER; + } +} diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/ObservableGauge.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/ObservableGauge.java new file mode 100644 index 00000000..1e31c98d --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/ObservableGauge.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.infrastructure.metric; + + +public interface ObservableGauge extends Metric { + + default N getValue(){return null;} + + void set(P1 l, P2 attributes); + + @Override + default MetricType getMetricType() { + return MetricType.GAUGE; + } +} diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopDoubleGauge.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopDoubleGauge.java new file mode 100644 index 00000000..8255533b --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopDoubleGauge.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.eventbridge.infrastructure.metric.otlp; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleGauge; +import io.opentelemetry.context.Context; + +public class NopDoubleGauge implements DoubleGauge { + + @Override + public void set(double l) { + + } + + @Override + public void set(double l, Attributes attributes) { + + } + + @Override + public void set(double l, Attributes attributes, Context context) { + + } +} diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopDoubleHistogram.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopDoubleHistogram.java new file mode 100644 index 00000000..34a51c29 --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopDoubleHistogram.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.eventbridge.infrastructure.metric.otlp; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.context.Context; + +public class NopDoubleHistogram implements DoubleHistogram { + + @Override + public void record(double l) { + + } + + @Override + public void record(double l, Attributes attributes) { + + } + + @Override + public void record(double l, Attributes attributes, Context context) { + + } +} diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopLongCounter.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopLongCounter.java new file mode 100644 index 00000000..05c66dad --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopLongCounter.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.eventbridge.infrastructure.metric.otlp; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.context.Context; + +public class NopLongCounter implements LongCounter { + + @Override + public void add(long l) { + + } + + @Override + public void add(long l, Attributes attributes) { + + } + + @Override + public void add(long l, Attributes attributes, Context context) { + + } +} diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopObservableDoubleGauge.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopObservableDoubleGauge.java new file mode 100644 index 00000000..3d5f86dd --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopObservableDoubleGauge.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.eventbridge.infrastructure.metric.otlp; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.ObservableDoubleMeasurement; + +public class NopObservableDoubleGauge implements ObservableDoubleMeasurement { + + @Override + public void record(double l) { + + } + + @Override + public void record(double l, Attributes attributes) { + + } +} diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopObservableLongCounter.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopObservableLongCounter.java new file mode 100644 index 00000000..70ab74b8 --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopObservableLongCounter.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.eventbridge.infrastructure.metric.otlp; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; + +public class NopObservableLongCounter implements ObservableLongMeasurement{ + + @Override + public void record(long v) { + + } + + @Override + public void record(long v, Attributes attributes) { + + } +} diff --git a/pom.xml b/pom.xml index c7965e8f..b2ccb1b2 100644 --- a/pom.xml +++ b/pom.xml @@ -87,6 +87,7 @@ 3.9.0 2.3.0 1.10.0 + 4.5.0-M2 2.13.0 5.9.2 2.9.3 @@ -96,6 +97,9 @@ 5.1.0 8.5.7 1.18.20 + 1.39.0 + 1.39.0-alpha + 0.16.0 @@ -282,6 +286,12 @@ commons-text ${apache.commons-text.version} + + + org.apache.commons + commons-collections4 + ${apache.commons-collections4.version} + com.github.ben-manes.caffeine caffeine @@ -317,6 +327,43 @@ ${rocketmq.version} + + + io.opentelemetry + opentelemetry-sdk + ${opentelemetry.version} + + + + io.opentelemetry + opentelemetry-exporter-otlp + ${opentelemetry.version} + + + + io.opentelemetry + opentelemetry-exporter-logging + ${opentelemetry.version} + + + + io.opentelemetry + opentelemetry-exporter-logging-otlp + ${opentelemetry.version} + + + + io.opentelemetry + opentelemetry-exporter-prometheus + ${opentelemetry.exporter.prometheus.version} + + + + io.prometheus + simpleclient + ${simpleclient.version} + + junit diff --git a/start/pom.xml b/start/pom.xml index 544a9689..bf56c396 100644 --- a/start/pom.xml +++ b/start/pom.xml @@ -88,6 +88,26 @@ com.alibaba fastjson + + org.apache.rocketmq + connect-eventbridge-transform + 1.1.0 + + + org.apache.rocketmq + connect-eventbridge-transform + 1.1.0 + + + org.apache.rocketmq + connect-filter-transform + 1.1.0 + + + org.apache.rocketmq + eventbridge-connect-file + 1.1.0 + diff --git a/start/src/main/java/org/apache/rocketmq/eventbridge/config/MetricsCollectorConfig.java b/start/src/main/java/org/apache/rocketmq/eventbridge/config/MetricsCollectorConfig.java new file mode 100644 index 00000000..2798b174 --- /dev/null +++ b/start/src/main/java/org/apache/rocketmq/eventbridge/config/MetricsCollectorConfig.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.config; + +import org.apache.rocketmq.eventbridge.infrastructure.metric.MetricConfig; +import org.apache.rocketmq.eventbridge.infrastructure.metric.MetricsCollectorFactory; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class MetricsCollectorConfig implements InitializingBean { + + @Autowired + private MetricConfig metricConfig; + + @Override + public void afterPropertiesSet() throws Exception { + MetricsCollectorFactory.getInstance().start(metricConfig); + } +} diff --git a/start/src/main/resources/application.properties b/start/src/main/resources/application.properties index 8daf91c5..408b9f37 100644 --- a/start/src/main/resources/application.properties +++ b/start/src/main/resources/application.properties @@ -38,4 +38,7 @@ runtime.pluginpath=./plugin ## log app.name=rocketmqeventbridge log.level=INFO -log.path=~/logs \ No newline at end of file +log.path=~/logs + +## metrics +metrics.metrics-exporter-type=PROM \ No newline at end of file diff --git a/supports/connect-eventbridge-transform/pom.xml b/supports/connect-eventbridge-transform/pom.xml index 36b78b8e..358fa405 100644 --- a/supports/connect-eventbridge-transform/pom.xml +++ b/supports/connect-eventbridge-transform/pom.xml @@ -40,7 +40,7 @@ org.apache.rocketmq - rocketmq-eventbridge-common + rocketmq-eventbridge-infrastructure 1.1.0 diff --git a/supports/connect-eventbridge-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeTransform.java b/supports/connect-eventbridge-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeTransform.java index 3b140212..136d602a 100644 --- a/supports/connect-eventbridge-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeTransform.java +++ b/supports/connect-eventbridge-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeTransform.java @@ -22,6 +22,8 @@ import io.openmessaging.connector.api.component.ComponentContext; import io.openmessaging.connector.api.data.ConnectRecord; import io.openmessaging.connector.api.data.SchemaBuilder; +import org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant; +import org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsManager; import org.apache.rocketmq.eventbridge.tools.transform.*; import java.util.Map; @@ -46,9 +48,32 @@ public ConnectRecord doTransform(ConnectRecord record) { record.addExtension(entry.getKey(), ((StringData) data).getData()); } }); + exportMetrics(record); return record; } + private static void exportMetrics(ConnectRecord connectRecord) { + EventBridgeMetricsManager.eventbridgeEventsTransferOutTotal.inc(1L, + EventBridgeMetricsManager.newAttributesBuilder() + .put(EventBridgeMetricsConstant.LABEL_STATUS, "success") + .put(EventBridgeMetricsConstant.LABEL_EVENT_BUS_NAME, connectRecord.getExtension("eventbusname")) + .put(EventBridgeMetricsConstant.LABEL_ACCOUNT_ID, connectRecord.getExtension("id")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_SOURCE, connectRecord.getExtension("source")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_TYPE, connectRecord.getExtension("type")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_RULE_NAME, connectRecord.getExtension("runner-name")) + .put(EventBridgeMetricsConstant.LABEL_TRANSFORM_TYPE, "filter").build()); + + EventBridgeMetricsManager.eventbridgeEventsTransferLatency.set(1D, + EventBridgeMetricsManager.newAttributesBuilder() + .put(EventBridgeMetricsConstant.LABEL_STATUS, "success") + .put(EventBridgeMetricsConstant.LABEL_EVENT_BUS_NAME, connectRecord.getExtension("eventbusname")) + .put(EventBridgeMetricsConstant.LABEL_ACCOUNT_ID, connectRecord.getExtension("id")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_SOURCE, connectRecord.getExtension("source")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_TYPE, connectRecord.getExtension("type")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_RULE_NAME, connectRecord.getExtension("runner-name")) + .put(EventBridgeMetricsConstant.LABEL_TRANSFORM_TYPE, "filter").build()); + } + @Override public void validate(KeyValue config) { diff --git a/supports/connect-filter-transform/pom.xml b/supports/connect-filter-transform/pom.xml index 1ea72933..e565de42 100644 --- a/supports/connect-filter-transform/pom.xml +++ b/supports/connect-filter-transform/pom.xml @@ -47,7 +47,7 @@ org.apache.rocketmq - rocketmq-eventbridge-common + rocketmq-eventbridge-infrastructure ${project.version} diff --git a/supports/connect-filter-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransform.java b/supports/connect-filter-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransform.java index 7105fa7a..fd6b5537 100644 --- a/supports/connect-filter-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransform.java +++ b/supports/connect-filter-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransform.java @@ -23,6 +23,8 @@ import io.openmessaging.KeyValue; import io.openmessaging.connector.api.component.ComponentContext; import io.openmessaging.connector.api.data.ConnectRecord; +import org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant; +import org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsManager; import org.apache.rocketmq.eventbridge.tools.pattern.PatternEvaluator; import org.apache.rocketmq.eventbridge.tools.pattern.PatternEvaluatorBuilder; @@ -34,6 +36,7 @@ public class EventBridgeFilterTransform implements io.openmessaging.connector.ap @Override public ConnectRecord doTransform(ConnectRecord record) { + exportMetrics(record); if (!evaluator.evaluateData(new Gson().toJson(record.getData()))) { return null; } else if (!evaluator.evaluateSpecAttr(this.buildSpecAttr(record))) { @@ -45,6 +48,18 @@ public ConnectRecord doTransform(ConnectRecord record) { } } + private static void exportMetrics(ConnectRecord connectRecord) { + EventBridgeMetricsManager.eventbridgeEventsTransferInTotal.inc(1L, + EventBridgeMetricsManager.newAttributesBuilder() + .put(EventBridgeMetricsConstant.LABEL_STATUS, "success") + .put(EventBridgeMetricsConstant.LABEL_EVENT_BUS_NAME, connectRecord.getExtension("eventbusname")) + .put(EventBridgeMetricsConstant.LABEL_ACCOUNT_ID, connectRecord.getExtension("id")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_SOURCE, connectRecord.getExtension("source")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_TYPE, connectRecord.getExtension("type")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_RULE_NAME, connectRecord.getExtension("runner-name")) + .put(EventBridgeMetricsConstant.LABEL_TRANSFORM_TYPE, "filter").build()); + } + private Map buildSpecAttr(ConnectRecord record) { Map extensionsAttrs = Maps.newHashMap(); SpecVersion.V1.getAllAttributes() diff --git a/test/rocketmq-eventbridge-e2etest/pom.xml b/test/rocketmq-eventbridge-e2etest/pom.xml index dab2c2c2..d994f6c7 100644 --- a/test/rocketmq-eventbridge-e2etest/pom.xml +++ b/test/rocketmq-eventbridge-e2etest/pom.xml @@ -27,6 +27,16 @@ + + org.jetbrains.kotlin + kotlin-stdlib + 1.3.70 + + + com.squareup.okhttp3 + okhttp + 4.2.0 + org.apache.rocketmq rocketmq-eventbridge-start @@ -67,11 +77,13 @@ connect-filter-transform 1.1.0 + org.apache.rocketmq eventbridge-connect-file 1.1.0 + \ No newline at end of file diff --git a/test/rocketmq-eventbridge-e2etest/src/test/java/org/apache/rocketmq/eventbridge/e2etest/controller/PostJsonExample.java b/test/rocketmq-eventbridge-e2etest/src/test/java/org/apache/rocketmq/eventbridge/e2etest/controller/PostJsonExample.java new file mode 100644 index 00000000..16327bbb --- /dev/null +++ b/test/rocketmq-eventbridge-e2etest/src/test/java/org/apache/rocketmq/eventbridge/e2etest/controller/PostJsonExample.java @@ -0,0 +1,42 @@ +package org.apache.rocketmq.eventbridge.e2etest.controller; + +import okhttp3.*; + +import java.io.IOException; + +public class PostJsonExample { + public static void main(String[] args) throws InterruptedException { + + // 构建 JSON 请求体 + + OkHttpClient client = new OkHttpClient(); + RequestBody body = RequestBody.create(MediaType.parse("application/json"), "A test recrod."); + + // 创建 POST 请求 + Request request = new Request.Builder() + .url("http://localhost:7001/putEvents") + .addHeader("Content-Type", "application/json") + .addHeader("ce-specversion","1.0") + .addHeader("ce-type", "com.github.pull_request.opened") + .addHeader("ce-source", "https://github.com/cloudevents/spec/pull") + .addHeader("ce-subject", "demo") + .addHeader("ce-id", "1234-1234-1234") + .addHeader("ce-datacontenttype", "application/json") + .addHeader("ce-time","2018-04-05T17:31:00Z") + .addHeader("ce-eventbusname", "demo-bus") + .post(body) + .build(); + + // 发送同步 POST 请求 + try (Response response = client.newCall(request).execute()) { + if (response.isSuccessful()) { + System.out.println(response.body().string()); + } else { + System.err.println("Request failed: " + response.code()); + } + } catch (IOException e) { + e.printStackTrace(); + } + } +} +