From ad677841b3b7e12266b08f49f0ee30dc6d4fffe3 Mon Sep 17 00:00:00 2001 From: wangkai Date: Sat, 21 Sep 2024 16:32:29 +0800 Subject: [PATCH 1/8] metrics --- metrics/metrics-api/pom.xml | 30 ++++++++++++++++++++++++++++++ metrics/pom.xml | 25 +++++++++++++++++++++++++ pom.xml | 1 + 3 files changed, 56 insertions(+) create mode 100644 metrics/metrics-api/pom.xml create mode 100644 metrics/pom.xml diff --git a/metrics/metrics-api/pom.xml b/metrics/metrics-api/pom.xml new file mode 100644 index 00000000..fa80ed0b --- /dev/null +++ b/metrics/metrics-api/pom.xml @@ -0,0 +1,30 @@ + + + 4.0.0 + + org.apache.rocketmq + metrics + 1.1.0 + ../pom.xml + + + rocketmq-eventbridge-metrics-api + + + 17 + 17 + UTF-8 + + + + io.micrometer + micrometer-core + + + com.tdunning + t-digest + + + \ No newline at end of file diff --git a/metrics/pom.xml b/metrics/pom.xml new file mode 100644 index 00000000..799d1c69 --- /dev/null +++ b/metrics/pom.xml @@ -0,0 +1,25 @@ + + + 4.0.0 + + org.apache.rocketmq + rocketmq-eventbridge + 1.1.0 + ../pom.xml + + + metrics + pom + + metrics-api + + + + 17 + 17 + UTF-8 + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml index c7965e8f..0beae68f 100644 --- a/pom.xml +++ b/pom.xml @@ -109,6 +109,7 @@ supports/eventbridge-connect-file test dist + metrics From ed23f5780fe21f293af487e2b2887d3c90325289 Mon Sep 17 00:00:00 2001 From: wangkai Date: Sun, 22 Sep 2024 23:10:38 +0800 Subject: [PATCH 2/8] add metrics --- metrics/{metrics-api => metrics-core}/pom.xml | 12 +----- .../rocketmq/eventbridge/metrics/Counter.java | 38 +++++++++++++++++++ .../eventbridge/metrics/Histogram.java | 33 ++++++++++++++++ .../metrics/HistogramStatistics.java | 36 ++++++++++++++++++ .../rocketmq/eventbridge/metrics/Meter.java | 35 +++++++++++++++++ .../rocketmq/eventbridge/metrics/Metric.java | 24 ++++++++++++ .../eventbridge/metrics/MetricType.java | 25 ++++++++++++ .../rocketmq/eventbridge/metrics/View.java | 27 +++++++++++++ metrics/pom.xml | 2 +- 9 files changed, 220 insertions(+), 12 deletions(-) rename metrics/{metrics-api => metrics-core}/pom.xml (66%) create mode 100644 metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Counter.java create mode 100644 metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Histogram.java create mode 100644 metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/HistogramStatistics.java create mode 100644 metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Meter.java create mode 100644 metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Metric.java create mode 100644 metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/MetricType.java create mode 100644 metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/View.java diff --git a/metrics/metrics-api/pom.xml b/metrics/metrics-core/pom.xml similarity index 66% rename from metrics/metrics-api/pom.xml rename to metrics/metrics-core/pom.xml index fa80ed0b..649ef68d 100644 --- a/metrics/metrics-api/pom.xml +++ b/metrics/metrics-core/pom.xml @@ -10,21 +10,11 @@ ../pom.xml - rocketmq-eventbridge-metrics-api + rocketmq-eventbridge-metrics-core 17 17 UTF-8 - - - io.micrometer - micrometer-core - - - com.tdunning - t-digest - - \ No newline at end of file diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Counter.java b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Counter.java new file mode 100644 index 00000000..7c14287f --- /dev/null +++ b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Counter.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.rocketmq.eventbridge.metrics; + + +public interface Counter extends Metric { + + void inc(); + + void inc(long n); + + void dec(); + + void dec(long n); + + long getCount(); + + @Override + default MetricType getMetricType() { + return MetricType.COUNTER; + } +} diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Histogram.java b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Histogram.java new file mode 100644 index 00000000..e3263459 --- /dev/null +++ b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Histogram.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.metrics; + +public interface Histogram extends Metric { + + void update(long value); + + long getCount(); + + HistogramStatistics getStatistics(); + + @Override + default MetricType getMetricType() { + return MetricType.HISTOGRAM; + } +} diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/HistogramStatistics.java b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/HistogramStatistics.java new file mode 100644 index 00000000..deb36076 --- /dev/null +++ b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/HistogramStatistics.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.metrics; + +public abstract class HistogramStatistics { + + public abstract double getQuantile(double quantile); + + public abstract long[] getValues(); + + public abstract int size(); + + public abstract double getMean(); + + public abstract double getStdDev(); + + public abstract long getMax(); + + public abstract long getMin(); +} diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Meter.java b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Meter.java new file mode 100644 index 00000000..2298c665 --- /dev/null +++ b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Meter.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.metrics; + +public interface Meter extends Metric { + + void markEvent(); + + void markEvent(long n); + + double getRate(); + + long getCount(); + + @Override + default MetricType getMetricType() { + return MetricType.METER; + } +} diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Metric.java b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Metric.java new file mode 100644 index 00000000..0a140dd6 --- /dev/null +++ b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Metric.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.metrics; + +public interface Metric { + default MetricType getMetricType() { + throw new UnsupportedOperationException("Custom metric type is not supported."); + } +} diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/MetricType.java b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/MetricType.java new file mode 100644 index 00000000..06e9f679 --- /dev/null +++ b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/MetricType.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.metrics; + +public enum MetricType { + COUNTER, + METER, + GAUGE, + HISTOGRAM +} diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/View.java b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/View.java new file mode 100644 index 00000000..f3fa73f0 --- /dev/null +++ b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/View.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.metrics; + +public interface View { + /** The interval in which metrics are updated. */ + int UPDATE_INTERVAL_SECONDS = 5; + + /** This method will be called regularly to update the metric. */ + void update(); +} diff --git a/metrics/pom.xml b/metrics/pom.xml index 799d1c69..57f496ce 100644 --- a/metrics/pom.xml +++ b/metrics/pom.xml @@ -13,7 +13,7 @@ metrics pom - metrics-api + metrics-core From f29170c3aa223e3acfc5bfd011759e217e4c9409 Mon Sep 17 00:00:00 2001 From: wangkai Date: Sun, 22 Sep 2024 23:11:39 +0800 Subject: [PATCH 3/8] update metrics --- .../java/org/apache/rocketmq/eventbridge/metrics/View.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/View.java b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/View.java index f3fa73f0..b5d98d50 100644 --- a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/View.java +++ b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/View.java @@ -19,9 +19,8 @@ package org.apache.rocketmq.eventbridge.metrics; public interface View { - /** The interval in which metrics are updated. */ - int UPDATE_INTERVAL_SECONDS = 5; - /** This method will be called regularly to update the metric. */ + int UPDATE_INTERVAL_SECONDS = 5; + void update(); } From 25cdc436d2e5bdce524f12d1a9f05e8451cee424 Mon Sep 17 00:00:00 2001 From: wangkai Date: Sun, 22 Sep 2024 23:41:54 +0800 Subject: [PATCH 4/8] update metrics --- .../main/java/org/apache/rocketmq/eventbridge/metrics/View.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/View.java b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/View.java index b5d98d50..87a9ddaa 100644 --- a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/View.java +++ b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/View.java @@ -21,6 +21,6 @@ public interface View { int UPDATE_INTERVAL_SECONDS = 5; - + void update(); } From 4f7e3410c68af6ee4500f6fdf3783549d64ae7e0 Mon Sep 17 00:00:00 2001 From: wangkai Date: Tue, 5 Nov 2024 14:43:17 +0800 Subject: [PATCH 5/8] =?UTF-8?q?=E9=87=8D=E6=9E=84=E6=8C=87=E6=A0=87?= =?UTF-8?q?=E7=B3=BB=E7=BB=9F=E5=B9=B6=E9=9B=86=E6=88=90OpenTelemetry?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 重命名和重构现有指标类,添加新的实现如DefaultLongCounter和DefaultLongHistogram 2. 引入OpenTelemetry依赖,添加NOP实现类用于默认行为 3. 更新Counter、Gauge和Histogram接口,使其与OpenTelemetry兼容 4. 创建EventBridgeMetricsManager用于统一管理指标 --- .../rocketmq/eventbridge/metrics/Counter.java | 12 ++-- .../metrics/DefaultLongCounter.java | 58 +++++++++++++++++++ ...mStatistics.java => DefaultLongGauge.java} | 27 +++++---- .../{View.java => DefaultLongHistogram.java} | 23 +++++++- .../metrics/EventBridgeMetricsManager.java | 29 ++++++++++ .../metrics/{Meter.java => Gauge.java} | 12 +--- .../eventbridge/metrics/Histogram.java | 8 +-- .../rocketmq/eventbridge/metrics/Metric.java | 4 +- .../metrics/otlp/NopLongCounter.java | 39 +++++++++++++ .../metrics/otlp/NopLongGauge.java | 39 +++++++++++++ .../metrics/otlp/NopLongHistogram.java | 39 +++++++++++++ .../otlp/NopObservableLongCounter.java | 26 +++++++++ metrics/metrics-prometheus/pom.xml | 21 +++++++ .../main/java/org/apache/rocketmq/Main.java | 7 +++ metrics/pom.xml | 9 ++- pom.xml | 8 +++ 16 files changed, 325 insertions(+), 36 deletions(-) create mode 100644 metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/DefaultLongCounter.java rename metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/{HistogramStatistics.java => DefaultLongGauge.java} (58%) rename metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/{View.java => DefaultLongHistogram.java} (56%) create mode 100644 metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/EventBridgeMetricsManager.java rename metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/{Meter.java => Gauge.java} (84%) create mode 100644 metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/otlp/NopLongCounter.java create mode 100644 metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/otlp/NopLongGauge.java create mode 100644 metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/otlp/NopLongHistogram.java create mode 100644 metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/otlp/NopObservableLongCounter.java create mode 100644 metrics/metrics-prometheus/pom.xml create mode 100644 metrics/metrics-prometheus/src/main/java/org/apache/rocketmq/Main.java diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Counter.java b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Counter.java index 7c14287f..86030491 100644 --- a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Counter.java +++ b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Counter.java @@ -19,17 +19,15 @@ package org.apache.rocketmq.eventbridge.metrics; -public interface Counter extends Metric { +public interface Counter extends Metric { - void inc(); + default void inc(){} - void inc(long n); + default void inc(long n, P attachment){} - void dec(); + default void dec(){} - void dec(long n); - - long getCount(); + default void dec(long n, P attachment){} @Override default MetricType getMetricType() { diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/DefaultLongCounter.java b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/DefaultLongCounter.java new file mode 100644 index 00000000..19324a37 --- /dev/null +++ b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/DefaultLongCounter.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.metrics; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongCounter; +import org.apache.rocketmq.eventbridge.metrics.otlp.NopLongCounter; + +public class DefaultLongCounter implements Counter { + + private LongCounter longCounter = new NopLongCounter(); + + private long count; + + @Override + public void inc() { + count++; + longCounter.add(count); + } + + @Override + public void inc(long n, Attributes attachment) { + longCounter.add(n, attachment); + } + + @Override + public void dec() { + count--; + longCounter.add(count); + } + + + @Override + public void dec(long n, Attributes attachment) { + longCounter.add(n, attachment); + } + + @Override + public void setInstrument(LongCounter instrument) { + this.longCounter = instrument; + } +} diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/HistogramStatistics.java b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/DefaultLongGauge.java similarity index 58% rename from metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/HistogramStatistics.java rename to metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/DefaultLongGauge.java index deb36076..5f771082 100644 --- a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/HistogramStatistics.java +++ b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/DefaultLongGauge.java @@ -18,19 +18,26 @@ package org.apache.rocketmq.eventbridge.metrics; -public abstract class HistogramStatistics { +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongGauge; +import org.apache.rocketmq.eventbridge.metrics.otlp.NopLongGauge; - public abstract double getQuantile(double quantile); +public class DefaultLongGauge implements Histogram { - public abstract long[] getValues(); + private LongGauge LongGauge = new NopLongGauge(); - public abstract int size(); + @Override + public void update(long value, Attributes attachment) { + LongGauge.set(value, attachment); + } - public abstract double getMean(); + @Override + public LongGauge getValue() { + return LongGauge; + } - public abstract double getStdDev(); - - public abstract long getMax(); - - public abstract long getMin(); + @Override + public void setInstrument(LongGauge instrument) { + this.LongGauge = instrument; + } } diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/View.java b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/DefaultLongHistogram.java similarity index 56% rename from metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/View.java rename to metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/DefaultLongHistogram.java index 87a9ddaa..027e0622 100644 --- a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/View.java +++ b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/DefaultLongHistogram.java @@ -18,9 +18,26 @@ package org.apache.rocketmq.eventbridge.metrics; -public interface View { +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongHistogram; +import org.apache.rocketmq.eventbridge.metrics.otlp.NopLongHistogram; - int UPDATE_INTERVAL_SECONDS = 5; +public class DefaultLongHistogram implements Histogram { - void update(); + private LongHistogram longHistogram = new NopLongHistogram(); + + @Override + public void update(long value, Attributes attachment) { + longHistogram.record(value, attachment); + } + + @Override + public LongHistogram getValue() { + return longHistogram; + } + + @Override + public void setInstrument(LongHistogram instrument) { + this.longHistogram = instrument; + } } diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/EventBridgeMetricsManager.java b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/EventBridgeMetricsManager.java new file mode 100644 index 00000000..77395195 --- /dev/null +++ b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/EventBridgeMetricsManager.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.eventbridge.metrics; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; + +import java.util.function.Supplier; + +public class EventBridgeMetricsManager { + + public static Supplier attributesBuilderSupplier = Attributes::builder; + + +} diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Meter.java b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Gauge.java similarity index 84% rename from metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Meter.java rename to metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Gauge.java index 2298c665..18ec6273 100644 --- a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Meter.java +++ b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Gauge.java @@ -18,18 +18,12 @@ package org.apache.rocketmq.eventbridge.metrics; -public interface Meter extends Metric { +public interface Gauge extends Metric { - void markEvent(); - - void markEvent(long n); - - double getRate(); - - long getCount(); + default T getValue(){return null;} @Override default MetricType getMetricType() { - return MetricType.METER; + return MetricType.GAUGE; } } diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Histogram.java b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Histogram.java index e3263459..4987c5ad 100644 --- a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Histogram.java +++ b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Histogram.java @@ -18,13 +18,11 @@ package org.apache.rocketmq.eventbridge.metrics; -public interface Histogram extends Metric { +public interface Histogram extends Metric { - void update(long value); + default void update(long value, P attachment){} - long getCount(); - - HistogramStatistics getStatistics(); + default R getValue(){return null;} @Override default MetricType getMetricType() { diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Metric.java b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Metric.java index 0a140dd6..61b623cc 100644 --- a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Metric.java +++ b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Metric.java @@ -17,8 +17,10 @@ package org.apache.rocketmq.eventbridge.metrics; -public interface Metric { +public interface Metric { default MetricType getMetricType() { throw new UnsupportedOperationException("Custom metric type is not supported."); } + + void setInstrument(R instrument); } diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/otlp/NopLongCounter.java b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/otlp/NopLongCounter.java new file mode 100644 index 00000000..a3a2341c --- /dev/null +++ b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/otlp/NopLongCounter.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.eventbridge.metrics.otlp; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.context.Context; + +public class NopLongCounter implements LongCounter { + + @Override + public void add(long l) { + + } + + @Override + public void add(long l, Attributes attributes) { + + } + + @Override + public void add(long l, Attributes attributes, Context context) { + + } +} diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/otlp/NopLongGauge.java b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/otlp/NopLongGauge.java new file mode 100644 index 00000000..8475c440 --- /dev/null +++ b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/otlp/NopLongGauge.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.eventbridge.metrics.otlp; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongGauge; +import io.opentelemetry.context.Context; + +public class NopLongGauge implements LongGauge { + + @Override + public void set(long l) { + + } + + @Override + public void set(long l, Attributes attributes) { + + } + + @Override + public void set(long l, Attributes attributes, Context context) { + + } +} diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/otlp/NopLongHistogram.java b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/otlp/NopLongHistogram.java new file mode 100644 index 00000000..e9afbdb8 --- /dev/null +++ b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/otlp/NopLongHistogram.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.eventbridge.metrics.otlp; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongHistogram; +import io.opentelemetry.context.Context; + +public class NopLongHistogram implements LongHistogram { + + @Override + public void record(long l) { + + } + + @Override + public void record(long l, Attributes attributes) { + + } + + @Override + public void record(long l, Attributes attributes, Context context) { + + } +} diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/otlp/NopObservableLongCounter.java b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/otlp/NopObservableLongCounter.java new file mode 100644 index 00000000..f37374dd --- /dev/null +++ b/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/otlp/NopObservableLongCounter.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.eventbridge.metrics.otlp; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.ObservableLongCounter; +import io.opentelemetry.context.Context; + +public class NopObservableLongCounter implements ObservableLongCounter { + +} diff --git a/metrics/metrics-prometheus/pom.xml b/metrics/metrics-prometheus/pom.xml new file mode 100644 index 00000000..6e0b4e25 --- /dev/null +++ b/metrics/metrics-prometheus/pom.xml @@ -0,0 +1,21 @@ + + + 4.0.0 + + org.apache.rocketmq + metrics + 1.1.0 + ../pom.xml + + + rocketmq-eventbridge-metrics-prometheus + + + 17 + 17 + UTF-8 + + + \ No newline at end of file diff --git a/metrics/metrics-prometheus/src/main/java/org/apache/rocketmq/Main.java b/metrics/metrics-prometheus/src/main/java/org/apache/rocketmq/Main.java new file mode 100644 index 00000000..8a38164e --- /dev/null +++ b/metrics/metrics-prometheus/src/main/java/org/apache/rocketmq/Main.java @@ -0,0 +1,7 @@ +package org.apache.rocketmq; + +public class Main { + public static void main(String[] args) { + System.out.println("Hello world!"); + } +} \ No newline at end of file diff --git a/metrics/pom.xml b/metrics/pom.xml index 57f496ce..7be23ace 100644 --- a/metrics/pom.xml +++ b/metrics/pom.xml @@ -4,8 +4,8 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - org.apache.rocketmq rocketmq-eventbridge + org.apache.rocketmq 1.1.0 ../pom.xml @@ -14,6 +14,7 @@ pom metrics-core + metrics-prometheus @@ -21,5 +22,11 @@ 17 UTF-8 + + + io.opentelemetry + opentelemetry-sdk + + \ No newline at end of file diff --git a/pom.xml b/pom.xml index 0beae68f..d2ca01bd 100644 --- a/pom.xml +++ b/pom.xml @@ -96,6 +96,7 @@ 5.1.0 8.5.7 1.18.20 + 1.39.0 @@ -318,6 +319,13 @@ ${rocketmq.version} + + + io.opentelemetry + opentelemetry-sdk + ${opentelemetry.version} + + junit From 8513ed017dc74f4dabce5cedf819732a031c47d1 Mon Sep 17 00:00:00 2001 From: wangkai Date: Sun, 22 Dec 2024 23:02:24 +0800 Subject: [PATCH 6/8] =?UTF-8?q?=E9=87=8D=E6=9E=84=E6=8C=87=E6=A0=87?= =?UTF-8?q?=E7=B3=BB=E7=BB=9F=E5=B9=B6=E8=BF=81=E7=A7=BB=E5=88=B0=E5=9F=BA?= =?UTF-8?q?=E7=A1=80=E8=AE=BE=E6=96=BD=E5=B1=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 将指标相关类从metrics模块移至infrastructure模块 2. 更新指标类以支持OpenTelemetry,包括Counter、Gauge、Histogram等 3. 添加MetricsCollectorFactory和MetricConfig用于配置和初始化指标收集器 --- domain/pom.xml | 2 +- infrastructure/pom.xml | 23 ++ .../infrastructure/metric}/Counter.java | 12 +- .../infrastructure/metric/DoubleGauge.java | 27 ++- .../metric/DoubleHistogram.java | 47 ++++ .../metric/DoubleObserverGauge.java | 48 ++++ .../metric/EventBridgeMetricsManager.java | 82 +++++++ .../infrastructure/metric}/Gauge.java | 9 +- .../infrastructure/metric}/Histogram.java | 6 +- .../infrastructure/metric/LongCounter.java | 31 +-- .../metric/LongObserverCounter.java | 48 ++++ .../infrastructure/metric}/Metric.java | 4 +- .../infrastructure/metric/MetricConfig.java | 52 ++++ .../infrastructure/metric}/MetricType.java | 2 +- .../metric/MetricsCollectorFactory.java | 226 ++++++++++++++++++ .../metric/MetricsExporterType.java | 52 ++++ .../infrastructure/metric/MonitorFactory.java | 31 --- .../metric/ObservableCounter.java | 18 +- .../metric/ObservableGauge.java | 23 +- .../metric/otlp/NopDoubleGauge.java | 12 +- .../metric/otlp/NopDoubleHistogram.java | 12 +- .../metric}/otlp/NopLongCounter.java | 2 +- .../metric/otlp/NopObservableDoubleGauge.java | 33 +++ .../metric/otlp/NopObservableLongCounter.java | 14 +- metrics/metrics-core/pom.xml | 20 -- metrics/metrics-prometheus/pom.xml | 21 -- .../main/java/org/apache/rocketmq/Main.java | 7 - metrics/pom.xml | 32 --- pom.xml | 33 ++- .../config/MetricsCollectorConfig.java | 36 +++ 30 files changed, 763 insertions(+), 202 deletions(-) rename {metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics => infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric}/Counter.java (78%) rename metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/DefaultLongHistogram.java => infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/DoubleGauge.java (55%) create mode 100644 infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/DoubleHistogram.java create mode 100644 infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/DoubleObserverGauge.java create mode 100644 infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/EventBridgeMetricsManager.java rename {metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics => infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric}/Gauge.java (82%) rename {metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics => infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric}/Histogram.java (84%) rename metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/DefaultLongCounter.java => infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/LongCounter.java (59%) create mode 100644 infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/LongObserverCounter.java rename {metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics => infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric}/Metric.java (91%) create mode 100644 infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricConfig.java rename {metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics => infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric}/MetricType.java (93%) create mode 100644 infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricsCollectorFactory.java create mode 100644 infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricsExporterType.java delete mode 100644 infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MonitorFactory.java rename metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/otlp/NopObservableLongCounter.java => infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/ObservableCounter.java (71%) rename metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/DefaultLongGauge.java => infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/ObservableGauge.java (56%) rename metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/otlp/NopLongGauge.java => infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopDoubleGauge.java (74%) rename metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/otlp/NopLongHistogram.java => infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopDoubleHistogram.java (73%) rename {metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics => infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric}/otlp/NopLongCounter.java (94%) create mode 100644 infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopObservableDoubleGauge.java rename metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/EventBridgeMetricsManager.java => infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopObservableLongCounter.java (71%) delete mode 100644 metrics/metrics-core/pom.xml delete mode 100644 metrics/metrics-prometheus/pom.xml delete mode 100644 metrics/metrics-prometheus/src/main/java/org/apache/rocketmq/Main.java delete mode 100644 metrics/pom.xml create mode 100644 start/src/main/java/org/apache/rocketmq/eventbridge/config/MetricsCollectorConfig.java diff --git a/domain/pom.xml b/domain/pom.xml index d0724819..735d529d 100644 --- a/domain/pom.xml +++ b/domain/pom.xml @@ -23,7 +23,7 @@ org.apache.rocketmq - rocketmq-eventbridge-common + rocketmq-eventbridge-infrastructure diff --git a/infrastructure/pom.xml b/infrastructure/pom.xml index 661455d9..591fb254 100644 --- a/infrastructure/pom.xml +++ b/infrastructure/pom.xml @@ -33,6 +33,29 @@ org.springframework.boot spring-boot-starter-webflux + + io.opentelemetry + opentelemetry-sdk + + + io.opentelemetry + opentelemetry-exporter-otlp + + + + io.opentelemetry + opentelemetry-exporter-logging + + + + io.opentelemetry + opentelemetry-exporter-prometheus + + + + io.opentelemetry + opentelemetry-exporter-logging-otlp + \ No newline at end of file diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Counter.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/Counter.java similarity index 78% rename from metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Counter.java rename to infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/Counter.java index 86030491..5bdc69ec 100644 --- a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Counter.java +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/Counter.java @@ -16,18 +16,14 @@ */ -package org.apache.rocketmq.eventbridge.metrics; +package org.apache.rocketmq.eventbridge.infrastructure.metric; -public interface Counter extends Metric { +public interface Counter extends Metric { - default void inc(){} + default void inc(P2 attachment){} - default void inc(long n, P attachment){} - - default void dec(){} - - default void dec(long n, P attachment){} + default void inc(P1 n, P2 attachment){} @Override default MetricType getMetricType() { diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/DefaultLongHistogram.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/DoubleGauge.java similarity index 55% rename from metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/DefaultLongHistogram.java rename to infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/DoubleGauge.java index 027e0622..eac7b83f 100644 --- a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/DefaultLongHistogram.java +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/DoubleGauge.java @@ -16,28 +16,33 @@ * limitations under the License. */ -package org.apache.rocketmq.eventbridge.metrics; +package org.apache.rocketmq.eventbridge.infrastructure.metric; import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.metrics.LongHistogram; -import org.apache.rocketmq.eventbridge.metrics.otlp.NopLongHistogram; +import org.apache.rocketmq.eventbridge.infrastructure.metric.otlp.NopDoubleGauge; -public class DefaultLongHistogram implements Histogram { - private LongHistogram longHistogram = new NopLongHistogram(); +public class DoubleGauge implements Gauge { + + private io.opentelemetry.api.metrics.DoubleGauge doubleGauge = new NopDoubleGauge(); + + @Override + public void set(Double value, Attributes attachment) { + doubleGauge.set(value, attachment); + } @Override - public void update(long value, Attributes attachment) { - longHistogram.record(value, attachment); + public io.opentelemetry.api.metrics.DoubleGauge getValue() { + return doubleGauge; } @Override - public LongHistogram getValue() { - return longHistogram; + public String getMetricName() { + return "DoubleGauge"; } @Override - public void setInstrument(LongHistogram instrument) { - this.longHistogram = instrument; + public void setInstrument(io.opentelemetry.api.metrics.DoubleGauge instrument) { + this.doubleGauge = instrument; } } diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/DoubleHistogram.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/DoubleHistogram.java new file mode 100644 index 00000000..1aaf9e94 --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/DoubleHistogram.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.infrastructure.metric; + +import io.opentelemetry.api.common.Attributes; +import org.apache.rocketmq.eventbridge.infrastructure.metric.otlp.NopDoubleHistogram; + +public class DoubleHistogram implements Histogram { + + private io.opentelemetry.api.metrics.DoubleHistogram doubleHistogram = new NopDoubleHistogram(); + + @Override + public void update(Double value, Attributes attachment) { + doubleHistogram.record(value, attachment); + } + + @Override + public io.opentelemetry.api.metrics.DoubleHistogram getValue() { + return doubleHistogram; + } + + @Override + public String getMetricName() { + return "LongHistogram"; + } + + @Override + public void setInstrument(io.opentelemetry.api.metrics.DoubleHistogram instrument) { + this.doubleHistogram = instrument; + } +} diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/DoubleObserverGauge.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/DoubleObserverGauge.java new file mode 100644 index 00000000..e0bbc7fc --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/DoubleObserverGauge.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.eventbridge.infrastructure.metric; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.ObservableDoubleMeasurement; +import org.apache.rocketmq.eventbridge.infrastructure.metric.otlp.NopObservableDoubleGauge; + +public class DoubleObserverGauge implements ObservableGauge { + + private io.opentelemetry.api.metrics.ObservableDoubleMeasurement observableDoubleGauge = new NopObservableDoubleGauge(); + + @Override + public void set(Double value, Attributes attachment) { + observableDoubleGauge.record(value, attachment); + } + + @Override + public ObservableDoubleMeasurement getValue() { + return observableDoubleGauge; + } + + + @Override + public String getMetricName() { + return "observableDoubleGauge"; + } + + @Override + public void setInstrument(ObservableDoubleMeasurement instrument) { + this.observableDoubleGauge = instrument; + } +} \ No newline at end of file diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/EventBridgeMetricsManager.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/EventBridgeMetricsManager.java new file mode 100644 index 00000000..aec83c03 --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/EventBridgeMetricsManager.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.eventbridge.infrastructure.metric; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.metrics.Meter; +import lombok.experimental.UtilityClass; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +@UtilityClass +public class EventBridgeMetricsManager { + + public static Supplier attributesBuilderSupplier = Attributes::builder; + + public final static List metrics = new ArrayList<>(); + + public static Counter httpCounter = new LongCounter(); + + public static Gauge totalLatencyGauge = new DoubleGauge();; + + public static Histogram totalLatencyHistogram = new DoubleHistogram(); + + public static ObservableGauge observableDoubleGauge = new DoubleObserverGauge(); + + + private final static Map LABEL_MAP = new HashMap<>(); + + static { + metrics.add(httpCounter); + metrics.add(totalLatencyHistogram); + metrics.add(totalLatencyGauge); + metrics.add(observableDoubleGauge); + } + + public static AttributesBuilder newAttributesBuilder() { + AttributesBuilder attributesBuilder; + if (attributesBuilderSupplier == null) { + attributesBuilderSupplier = Attributes::builder; + } + attributesBuilder = attributesBuilderSupplier.get(); + LABEL_MAP.forEach(attributesBuilder::put); + return attributesBuilder; + } + + public static void initMetricsView(Meter brokerMeter) { + for(Metric metric : metrics) { + if (metric instanceof Counter) { + metric.setInstrument(brokerMeter.counterBuilder(metric.getMetricName()).build());; + } else if (metric instanceof ObservableCounter) { + metric.setInstrument(brokerMeter.counterBuilder(metric.getMetricName()).buildObserver()); + } else if (metric instanceof Histogram) { + metric.setInstrument(brokerMeter.histogramBuilder(metric.getMetricName()).build()); + } else if (metric instanceof Gauge) { + metric.setInstrument(brokerMeter.gaugeBuilder(metric.getMetricName()).build()); + } else if (metric instanceof ObservableGauge) { + metric.setInstrument(brokerMeter.gaugeBuilder(metric.getMetricName()).buildObserver()); + } + } + + } + +} diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Gauge.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/Gauge.java similarity index 82% rename from metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Gauge.java rename to infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/Gauge.java index 18ec6273..bbac4b99 100644 --- a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Gauge.java +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/Gauge.java @@ -16,11 +16,14 @@ * limitations under the License. */ -package org.apache.rocketmq.eventbridge.metrics; +package org.apache.rocketmq.eventbridge.infrastructure.metric; -public interface Gauge extends Metric { - default T getValue(){return null;} +public interface Gauge extends Metric { + + default N getValue(){return null;} + + void set(P1 l, P2 attributes); @Override default MetricType getMetricType() { diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Histogram.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/Histogram.java similarity index 84% rename from metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Histogram.java rename to infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/Histogram.java index 4987c5ad..35477396 100644 --- a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Histogram.java +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/Histogram.java @@ -16,11 +16,11 @@ * limitations under the License. */ -package org.apache.rocketmq.eventbridge.metrics; +package org.apache.rocketmq.eventbridge.infrastructure.metric; -public interface Histogram extends Metric { +public interface Histogram extends Metric { - default void update(long value, P attachment){} + default void update(P1 value, P2 attachment){} default R getValue(){return null;} diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/DefaultLongCounter.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/LongCounter.java similarity index 59% rename from metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/DefaultLongCounter.java rename to infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/LongCounter.java index 19324a37..4644b948 100644 --- a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/DefaultLongCounter.java +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/LongCounter.java @@ -16,43 +16,32 @@ * limitations under the License. */ -package org.apache.rocketmq.eventbridge.metrics; +package org.apache.rocketmq.eventbridge.infrastructure.metric; import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.metrics.LongCounter; -import org.apache.rocketmq.eventbridge.metrics.otlp.NopLongCounter; +import org.apache.rocketmq.eventbridge.infrastructure.metric.otlp.NopLongCounter; -public class DefaultLongCounter implements Counter { +public class LongCounter implements Counter { - private LongCounter longCounter = new NopLongCounter(); - - private long count; + private io.opentelemetry.api.metrics.LongCounter longCounter = new NopLongCounter(); @Override - public void inc() { - count++; - longCounter.add(count); + public void inc(Attributes attachment) { + longCounter.add(1, attachment); } @Override - public void inc(long n, Attributes attachment) { + public void inc(Long n, Attributes attachment) { longCounter.add(n, attachment); } @Override - public void dec() { - count--; - longCounter.add(count); - } - - - @Override - public void dec(long n, Attributes attachment) { - longCounter.add(n, attachment); + public String getMetricName() { + return "longCounter"; } @Override - public void setInstrument(LongCounter instrument) { + public void setInstrument(io.opentelemetry.api.metrics.LongCounter instrument) { this.longCounter = instrument; } } diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/LongObserverCounter.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/LongObserverCounter.java new file mode 100644 index 00000000..41a7aeed --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/LongObserverCounter.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.infrastructure.metric; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import org.apache.rocketmq.eventbridge.infrastructure.metric.otlp.NopObservableLongCounter; + +public class LongObserverCounter implements ObservableCounter { + + private io.opentelemetry.api.metrics.ObservableLongMeasurement observableLongCounter = new NopObservableLongCounter(); + + @Override + public void inc(Attributes attachment) { + observableLongCounter.record(1, attachment); + } + + @Override + public void inc(Long n, Attributes attachment) { + observableLongCounter.record(n, attachment); + } + + @Override + public String getMetricName() { + return "observableLongCounter"; + } + + @Override + public void setInstrument(ObservableLongMeasurement instrument) { + this.observableLongCounter = instrument; + } +} diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Metric.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/Metric.java similarity index 91% rename from metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Metric.java rename to infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/Metric.java index 61b623cc..b532b782 100644 --- a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/Metric.java +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/Metric.java @@ -15,12 +15,14 @@ * limitations under the License. */ -package org.apache.rocketmq.eventbridge.metrics; +package org.apache.rocketmq.eventbridge.infrastructure.metric; public interface Metric { default MetricType getMetricType() { throw new UnsupportedOperationException("Custom metric type is not supported."); } + String getMetricName(); + void setInstrument(R instrument); } diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricConfig.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricConfig.java new file mode 100644 index 00000000..2e3eb1a6 --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricConfig.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.eventbridge.infrastructure.metric; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Data +@ConfigurationProperties(prefix="metrics") +@EnableConfigurationProperties +@Configuration +public class MetricConfig { + + private MetricsExporterType metricsExporterType = MetricsExporterType.DISABLE; + + private String labels; + + private boolean inDelta = false; + + private int otelCardinalityLimit = 50 * 1000; + + private String grpcExporterTarget = ""; + + private String grpcExporterHeader = ""; + + private long grpcExporterTimeOutInMills = 3 * 1000; + + private long grpcExporterIntervalInMills = 60 * 1000; + + private long loggingExporterIntervalInMills = 10 * 1000; + + private int promExporterPort = 5557; + + private String promExporterHost = "localhost"; +} diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/MetricType.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricType.java similarity index 93% rename from metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/MetricType.java rename to infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricType.java index 06e9f679..a01b88b3 100644 --- a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/MetricType.java +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricType.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.eventbridge.metrics; +package org.apache.rocketmq.eventbridge.infrastructure.metric; public enum MetricType { COUNTER, diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricsCollectorFactory.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricsCollectorFactory.java new file mode 100644 index 00000000..11337278 --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricsCollectorFactory.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.infrastructure.metric; + +import com.google.common.base.Splitter; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.exporter.logging.otlp.OtlpJsonLoggingMetricExporter; +import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter; +import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder; +import io.opentelemetry.exporter.prometheus.PrometheusHttpServer; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.Aggregation; +import io.opentelemetry.sdk.metrics.InstrumentSelector; +import io.opentelemetry.sdk.metrics.InstrumentType; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; +import io.opentelemetry.sdk.metrics.View; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; +import io.opentelemetry.sdk.resources.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.bridge.SLF4JBridgeHandler; + +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +@Slf4j +public class MetricsCollectorFactory { + public static final String OPEN_TELEMETRY_METER_NAME = "eventbridge-meter"; + + public static final String HISTOGRAM_MESSAGE_SIZE = "eventbridge_message_size"; + + public static final String EVENTBUS_IN_EVENTS_TOTAL = "eventbridge_eventbus_in_events_total"; + + public static final String EVENTRULE_FILTER_EVENTS_TOTAL = "eventbridge_eventrule_filter_events_total"; + + public static final String EVENTRULE_LATENCY_SECONDS = "eventbridge_eventrule_latency_seconds"; + + public static final String EVENTRULE_TRIGGER_LATENCY = "eventbridge_eventrule_trigger_latency"; + + + private final static Map LABEL_MAP = new HashMap<>(); + + private static class MetricsCollectorHolder{ + static MetricsCollectorFactory instance = new MetricsCollectorFactory(); + } + + public static MetricsCollectorFactory getInstance(){ + return MetricsCollectorFactory.MetricsCollectorHolder.instance; + } + + + private MetricsCollectorFactory() { + + } + + public void start(MetricConfig metricConfig) { + + if (!checkConfig(metricConfig)) { + log.error("check metrics config failed, will not export metrics"); + return; + } + + MetricsExporterType metricsExporterType = metricConfig.getMetricsExporterType(); + if (metricsExporterType == MetricsExporterType.DISABLE) { + return; + } + + SdkMeterProviderBuilder providerBuilder = SdkMeterProvider.builder() + .setResource(Resource.empty()); + PeriodicMetricReader periodicMetricReader; + if (metricsExporterType == MetricsExporterType.OTLP_GRPC) { + String endpoint = metricConfig.getGrpcExporterTarget(); + if (!endpoint.startsWith("http")) { + endpoint = "https://" + endpoint; + } + OtlpGrpcMetricExporterBuilder metricExporterBuilder = OtlpGrpcMetricExporter.builder() + .setEndpoint(endpoint) + .setTimeout(metricConfig.getGrpcExporterTimeOutInMills(), TimeUnit.MILLISECONDS) + .setAggregationTemporalitySelector(type -> { + if (metricConfig.isInDelta() && + (type == InstrumentType.COUNTER || type == InstrumentType.OBSERVABLE_COUNTER || type == InstrumentType.HISTOGRAM)) { + return AggregationTemporality.DELTA; + } + return AggregationTemporality.CUMULATIVE; + }); + + String headers = metricConfig.getGrpcExporterHeader(); + if (StringUtils.isNotBlank(headers)) { + Map headerMap = new HashMap<>(); + List kvPairs = Splitter.on(',').omitEmptyStrings().splitToList(headers); + for (String item : kvPairs) { + String[] split = item.split(":"); + if (split.length != 2) { + log.warn("metricsGrpcExporterHeader is not valid: {}", headers); + continue; + } + headerMap.put(split[0], split[1]); + } + headerMap.forEach(metricExporterBuilder::addHeader); + } + + OtlpGrpcMetricExporter metricExporter = metricExporterBuilder.build(); + + periodicMetricReader = PeriodicMetricReader.builder(metricExporter) + .setInterval(metricConfig.getGrpcExporterIntervalInMills(), TimeUnit.MILLISECONDS) + .build(); + + providerBuilder.registerMetricReader(periodicMetricReader); + } + PrometheusHttpServer prometheusHttpServer; + if (metricsExporterType == MetricsExporterType.PROM) { + String promExporterHost = metricConfig.getPromExporterHost(); + prometheusHttpServer = PrometheusHttpServer.builder() + .setHost(promExporterHost) + .setPort(metricConfig.getPromExporterPort()) + .build(); + providerBuilder.registerMetricReader(prometheusHttpServer); + } + + MetricExporter loggingMetricExporter; + if (metricsExporterType == MetricsExporterType.LOG) { + SLF4JBridgeHandler.removeHandlersForRootLogger(); + SLF4JBridgeHandler.install(); + loggingMetricExporter = OtlpJsonLoggingMetricExporter.create(metricConfig.isInDelta() ? AggregationTemporality.DELTA : AggregationTemporality.CUMULATIVE); + Logger.getLogger(OtlpJsonLoggingMetricExporter.class.getName()).setLevel(java.util.logging.Level.FINEST); + periodicMetricReader = PeriodicMetricReader.builder(loggingMetricExporter) + .setInterval(metricConfig.getLoggingExporterIntervalInMills(), TimeUnit.MILLISECONDS) + .build(); + providerBuilder.registerMetricReader(periodicMetricReader); + } + + registerMetricsView(providerBuilder); + + Meter brokerMeter = OpenTelemetrySdk.builder() + .setMeterProvider(providerBuilder.build()) + .build() + .getMeter(OPEN_TELEMETRY_METER_NAME); + EventBridgeMetricsManager.initMetricsView(brokerMeter); + } + + public static boolean checkConfig(MetricConfig metricConfig) { + if (metricConfig == null) { + return false; + } + MetricsExporterType exporterType = metricConfig.getMetricsExporterType(); + if (!exporterType.isEnable()) { + return false; + } + + switch (exporterType) { + case OTLP_GRPC: + return StringUtils.isNotBlank(metricConfig.getGrpcExporterTarget()); + case PROM: + return true; + case LOG: + return true; + } + return false; + } + + private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) { + // message size buckets, 1k, 4k, 512k, 1M, 2M, 4M + List messageSizeBuckets = Arrays.asList( + 1d * 1024, //1KB + 4d * 1024, //4KB + 512d * 1024, //512KB + 1d * 1024 * 1024, //1MB + 2d * 1024 * 1024, //2MB + 4d * 1024 * 1024 //4MB + ); + InstrumentSelector messageSizeSelector = InstrumentSelector.builder() + .setType(InstrumentType.HISTOGRAM) + .setName(HISTOGRAM_MESSAGE_SIZE) + .build(); + + View messageSizeView = View.builder() + .setAggregation(Aggregation.explicitBucketHistogram(messageSizeBuckets)) + .build(); + providerBuilder.registerView(messageSizeSelector, messageSizeView); + + List rpcCostTimeBuckets = Arrays.asList( + (double) Duration.ofMillis(1).toMillis(), + (double) Duration.ofMillis(3).toMillis(), + (double) Duration.ofMillis(5).toMillis(), + (double) Duration.ofMillis(7).toMillis(), + (double) Duration.ofMillis(10).toMillis(), + (double) Duration.ofMillis(100).toMillis(), + (double) Duration.ofSeconds(1).toMillis(), + (double) Duration.ofSeconds(2).toMillis(), + (double) Duration.ofSeconds(3).toMillis() + ); + InstrumentSelector selector = InstrumentSelector.builder() + .setType(InstrumentType.HISTOGRAM) + .setName(EVENTRULE_TRIGGER_LATENCY) + .build(); + View view = View.builder() + .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets)) + .build(); + + providerBuilder.registerView(selector, view); + } + +} diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricsExporterType.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricsExporterType.java new file mode 100644 index 00000000..5e071579 --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricsExporterType.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.infrastructure.metric; + +public enum MetricsExporterType { + DISABLE(0), + OTLP_GRPC(1), + PROM(2), + LOG(3); + + private final int value; + + MetricsExporterType(int value) { + this.value = value; + } + + public int getValue() { + return value; + } + + public static MetricsExporterType valueOf(int value) { + switch (value) { + case 1: + return OTLP_GRPC; + case 2: + return PROM; + case 3: + return LOG; + default: + return DISABLE; + } + } + + public boolean isEnable() { + return this.value > 0; + } +} \ No newline at end of file diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MonitorFactory.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MonitorFactory.java deleted file mode 100644 index 6311d2a9..00000000 --- a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MonitorFactory.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.eventbridge.infrastructure.metric; - -import java.util.Map; - -public class MonitorFactory { - - public void createSpan(Map content){ - - } - - public void finishSpan(Map content){ - - } -} diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/otlp/NopObservableLongCounter.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/ObservableCounter.java similarity index 71% rename from metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/otlp/NopObservableLongCounter.java rename to infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/ObservableCounter.java index f37374dd..77a19c86 100644 --- a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/otlp/NopObservableLongCounter.java +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/ObservableCounter.java @@ -14,13 +14,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.eventbridge.metrics.otlp; -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.metrics.LongCounter; -import io.opentelemetry.api.metrics.ObservableLongCounter; -import io.opentelemetry.context.Context; -public class NopObservableLongCounter implements ObservableLongCounter { +package org.apache.rocketmq.eventbridge.infrastructure.metric; + +public interface ObservableCounter extends Metric { + + default void inc(P2 attachment){} + + default void inc(P1 n, P2 attachment){} + + @Override + default MetricType getMetricType() { + return MetricType.COUNTER; + } } diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/DefaultLongGauge.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/ObservableGauge.java similarity index 56% rename from metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/DefaultLongGauge.java rename to infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/ObservableGauge.java index 5f771082..1e31c98d 100644 --- a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/DefaultLongGauge.java +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/ObservableGauge.java @@ -16,28 +16,17 @@ * limitations under the License. */ -package org.apache.rocketmq.eventbridge.metrics; +package org.apache.rocketmq.eventbridge.infrastructure.metric; -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.metrics.LongGauge; -import org.apache.rocketmq.eventbridge.metrics.otlp.NopLongGauge; -public class DefaultLongGauge implements Histogram { +public interface ObservableGauge extends Metric { - private LongGauge LongGauge = new NopLongGauge(); + default N getValue(){return null;} - @Override - public void update(long value, Attributes attachment) { - LongGauge.set(value, attachment); - } - - @Override - public LongGauge getValue() { - return LongGauge; - } + void set(P1 l, P2 attributes); @Override - public void setInstrument(LongGauge instrument) { - this.LongGauge = instrument; + default MetricType getMetricType() { + return MetricType.GAUGE; } } diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/otlp/NopLongGauge.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopDoubleGauge.java similarity index 74% rename from metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/otlp/NopLongGauge.java rename to infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopDoubleGauge.java index 8475c440..8255533b 100644 --- a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/otlp/NopLongGauge.java +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopDoubleGauge.java @@ -14,26 +14,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.eventbridge.metrics.otlp; +package org.apache.rocketmq.eventbridge.infrastructure.metric.otlp; import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.metrics.LongGauge; +import io.opentelemetry.api.metrics.DoubleGauge; import io.opentelemetry.context.Context; -public class NopLongGauge implements LongGauge { +public class NopDoubleGauge implements DoubleGauge { @Override - public void set(long l) { + public void set(double l) { } @Override - public void set(long l, Attributes attributes) { + public void set(double l, Attributes attributes) { } @Override - public void set(long l, Attributes attributes, Context context) { + public void set(double l, Attributes attributes, Context context) { } } diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/otlp/NopLongHistogram.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopDoubleHistogram.java similarity index 73% rename from metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/otlp/NopLongHistogram.java rename to infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopDoubleHistogram.java index e9afbdb8..34a51c29 100644 --- a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/otlp/NopLongHistogram.java +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopDoubleHistogram.java @@ -14,26 +14,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.eventbridge.metrics.otlp; +package org.apache.rocketmq.eventbridge.infrastructure.metric.otlp; import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.metrics.LongHistogram; +import io.opentelemetry.api.metrics.DoubleHistogram; import io.opentelemetry.context.Context; -public class NopLongHistogram implements LongHistogram { +public class NopDoubleHistogram implements DoubleHistogram { @Override - public void record(long l) { + public void record(double l) { } @Override - public void record(long l, Attributes attributes) { + public void record(double l, Attributes attributes) { } @Override - public void record(long l, Attributes attributes, Context context) { + public void record(double l, Attributes attributes, Context context) { } } diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/otlp/NopLongCounter.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopLongCounter.java similarity index 94% rename from metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/otlp/NopLongCounter.java rename to infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopLongCounter.java index a3a2341c..05c66dad 100644 --- a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/otlp/NopLongCounter.java +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopLongCounter.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.eventbridge.metrics.otlp; +package org.apache.rocketmq.eventbridge.infrastructure.metric.otlp; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.LongCounter; diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopObservableDoubleGauge.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopObservableDoubleGauge.java new file mode 100644 index 00000000..3d5f86dd --- /dev/null +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopObservableDoubleGauge.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.eventbridge.infrastructure.metric.otlp; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.ObservableDoubleMeasurement; + +public class NopObservableDoubleGauge implements ObservableDoubleMeasurement { + + @Override + public void record(double l) { + + } + + @Override + public void record(double l, Attributes attributes) { + + } +} diff --git a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/EventBridgeMetricsManager.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopObservableLongCounter.java similarity index 71% rename from metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/EventBridgeMetricsManager.java rename to infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopObservableLongCounter.java index 77395195..70ab74b8 100644 --- a/metrics/metrics-core/src/main/java/org/apache/rocketmq/eventbridge/metrics/EventBridgeMetricsManager.java +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/otlp/NopObservableLongCounter.java @@ -14,16 +14,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.eventbridge.metrics; +package org.apache.rocketmq.eventbridge.infrastructure.metric.otlp; import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; -import java.util.function.Supplier; +public class NopObservableLongCounter implements ObservableLongMeasurement{ -public class EventBridgeMetricsManager { + @Override + public void record(long v) { - public static Supplier attributesBuilderSupplier = Attributes::builder; + } + @Override + public void record(long v, Attributes attributes) { + } } diff --git a/metrics/metrics-core/pom.xml b/metrics/metrics-core/pom.xml deleted file mode 100644 index 649ef68d..00000000 --- a/metrics/metrics-core/pom.xml +++ /dev/null @@ -1,20 +0,0 @@ - - - 4.0.0 - - org.apache.rocketmq - metrics - 1.1.0 - ../pom.xml - - - rocketmq-eventbridge-metrics-core - - - 17 - 17 - UTF-8 - - \ No newline at end of file diff --git a/metrics/metrics-prometheus/pom.xml b/metrics/metrics-prometheus/pom.xml deleted file mode 100644 index 6e0b4e25..00000000 --- a/metrics/metrics-prometheus/pom.xml +++ /dev/null @@ -1,21 +0,0 @@ - - - 4.0.0 - - org.apache.rocketmq - metrics - 1.1.0 - ../pom.xml - - - rocketmq-eventbridge-metrics-prometheus - - - 17 - 17 - UTF-8 - - - \ No newline at end of file diff --git a/metrics/metrics-prometheus/src/main/java/org/apache/rocketmq/Main.java b/metrics/metrics-prometheus/src/main/java/org/apache/rocketmq/Main.java deleted file mode 100644 index 8a38164e..00000000 --- a/metrics/metrics-prometheus/src/main/java/org/apache/rocketmq/Main.java +++ /dev/null @@ -1,7 +0,0 @@ -package org.apache.rocketmq; - -public class Main { - public static void main(String[] args) { - System.out.println("Hello world!"); - } -} \ No newline at end of file diff --git a/metrics/pom.xml b/metrics/pom.xml deleted file mode 100644 index 7be23ace..00000000 --- a/metrics/pom.xml +++ /dev/null @@ -1,32 +0,0 @@ - - - 4.0.0 - - rocketmq-eventbridge - org.apache.rocketmq - 1.1.0 - ../pom.xml - - - metrics - pom - - metrics-core - metrics-prometheus - - - - 17 - 17 - UTF-8 - - - - io.opentelemetry - opentelemetry-sdk - - - - \ No newline at end of file diff --git a/pom.xml b/pom.xml index d2ca01bd..8df75ef6 100644 --- a/pom.xml +++ b/pom.xml @@ -87,6 +87,7 @@ 3.9.0 2.3.0 1.10.0 + 4.5.0-M2 2.13.0 5.9.2 2.9.3 @@ -97,6 +98,7 @@ 8.5.7 1.18.20 1.39.0 + 1.39.0-alpha @@ -110,7 +112,6 @@ supports/eventbridge-connect-file test dist - metrics @@ -284,6 +285,12 @@ commons-text ${apache.commons-text.version} + + + org.apache.commons + commons-collections4 + ${apache.commons-collections4.version} + com.github.ben-manes.caffeine caffeine @@ -326,6 +333,30 @@ ${opentelemetry.version} + + io.opentelemetry + opentelemetry-exporter-otlp + ${opentelemetry.version} + + + + io.opentelemetry + opentelemetry-exporter-logging + ${opentelemetry.version} + + + + io.opentelemetry + opentelemetry-exporter-logging-otlp + ${opentelemetry.version} + + + + io.opentelemetry + opentelemetry-exporter-prometheus + ${opentelemetry.exporter.prometheus.version} + + junit diff --git a/start/src/main/java/org/apache/rocketmq/eventbridge/config/MetricsCollectorConfig.java b/start/src/main/java/org/apache/rocketmq/eventbridge/config/MetricsCollectorConfig.java new file mode 100644 index 00000000..2798b174 --- /dev/null +++ b/start/src/main/java/org/apache/rocketmq/eventbridge/config/MetricsCollectorConfig.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.config; + +import org.apache.rocketmq.eventbridge.infrastructure.metric.MetricConfig; +import org.apache.rocketmq.eventbridge.infrastructure.metric.MetricsCollectorFactory; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class MetricsCollectorConfig implements InitializingBean { + + @Autowired + private MetricConfig metricConfig; + + @Override + public void afterPropertiesSet() throws Exception { + MetricsCollectorFactory.getInstance().start(metricConfig); + } +} From 6b67641e082d08bd05c97722460d48d0dbac9ccf Mon Sep 17 00:00:00 2001 From: wangkai Date: Sun, 5 Jan 2025 16:28:45 +0800 Subject: [PATCH 7/8] =?UTF-8?q?=E6=B7=BB=E5=8A=A0EventBridge=E6=8C=87?= =?UTF-8?q?=E6=A0=87=E5=B8=B8=E9=87=8F=E5=92=8C=E6=9B=B4=E6=96=B0=E6=8C=87?= =?UTF-8?q?=E6=A0=87=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 新增EventBridgeMetricsConstant类定义指标标签常量 2. 更新DoubleGauge、DoubleHistogram等类,添加metricName字段 3. 在EventBridgeMetricsManager中定义具体的指标实例 --- .../infrastructure/metric/DoubleGauge.java | 8 +++- .../metric/DoubleHistogram.java | 8 +++- .../metric/DoubleObserverGauge.java | 9 +++- .../metric/EventBridgeMetricsConstant.java | 48 +++++++++++++++++++ .../metric/EventBridgeMetricsManager.java | 28 ++++++++--- .../infrastructure/metric/LongCounter.java | 9 +++- .../metric/LongObserverCounter.java | 8 +++- 7 files changed, 106 insertions(+), 12 deletions(-) create mode 100644 infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/EventBridgeMetricsConstant.java diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/DoubleGauge.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/DoubleGauge.java index eac7b83f..894bdacf 100644 --- a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/DoubleGauge.java +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/DoubleGauge.java @@ -26,6 +26,12 @@ public class DoubleGauge implements Gauge metrics = new ArrayList<>(); - public static Counter httpCounter = new LongCounter(); + public static Histogram eventbridgePutEventsLatency = new DoubleHistogram("eventbridge_putevents_latency"); - public static Gauge totalLatencyGauge = new DoubleGauge();; + public static Histogram eventbridgePutEventsSize = new DoubleHistogram("eventbridge_putevents_size"); - public static Histogram totalLatencyHistogram = new DoubleHistogram(); + public static ObservableGauge observableDoubleGauge = new DoubleObserverGauge("eventbridge_event_rule_latency"); - public static ObservableGauge observableDoubleGauge = new DoubleObserverGauge(); + public static Counter eventbridgeEventRuleLagEventsTotal = new LongCounter("eventbridge_event_rule_lag_events_total"); + public static Counter eventbridgeEventsTransferInTotal = new LongCounter("eventbridge_events_transfer_in_total"); + + public static Counter eventbridgeEventsTransferOutTotal = new LongCounter("eventbridge_events_transfer_out_total"); + + public static ObservableGauge eventbridgeEventsTransferLatency = new DoubleObserverGauge("eventbridge_events_transfer_latency"); + + public static Histogram eventbridgeEventsTriggerLatency = new DoubleHistogram("eventbridge_events_trigger_latency"); + + public static ObservableGauge eventbridgeEventsLatency = new DoubleObserverGauge("eventbridge_events_latency"); private final static Map LABEL_MAP = new HashMap<>(); static { - metrics.add(httpCounter); - metrics.add(totalLatencyHistogram); - metrics.add(totalLatencyGauge); + metrics.add(eventbridgePutEventsLatency); + metrics.add(eventbridgePutEventsSize); metrics.add(observableDoubleGauge); + metrics.add(eventbridgeEventRuleLagEventsTotal); + metrics.add(eventbridgeEventsTransferInTotal); + metrics.add(eventbridgeEventsTransferOutTotal); + metrics.add(eventbridgeEventsTransferLatency); + metrics.add(eventbridgeEventsTriggerLatency); + metrics.add(eventbridgeEventsLatency); } public static AttributesBuilder newAttributesBuilder() { diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/LongCounter.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/LongCounter.java index 4644b948..eebe6443 100644 --- a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/LongCounter.java +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/LongCounter.java @@ -25,6 +25,13 @@ public class LongCounter implements Counter Date: Sun, 9 Mar 2025 16:52:31 +0800 Subject: [PATCH 8/8] Add metrics collection and monitoring for EventBridge components MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 更新依赖从common到infrastructure模块 2. 添加Prometheus指标导出功能 3. 在事件处理流程中添加指标收集点(PutEvents、Filter、Transform、Trigger) 4. 配置自定义指标桶和标签 DevPilot: I've analyzed the changes and created a commit message that summarizes the key modifications. The message highlights the addition of metrics collection throughout the EventBridge components, the dependency updates, and the configuration of Prometheus for metrics export. The message is in Chinese as requested and follows best practices for commit messages. --- .../runtime/boot/EventRuleTransfer.java | 34 +++++++- .../runtime/boot/EventTargetTrigger.java | 26 ++++++ .../rocketmq/impl/DefaultSendCallback.java | 6 ++ .../impl/RocketMQEventDataRepository.java | 24 +++++- infrastructure/pom.xml | 4 + .../metric/EventBridgeMetricsConstant.java | 18 ++++ .../metric/EventBridgeMetricsManager.java | 28 +++++-- .../infrastructure/metric/MetricConfig.java | 2 + .../metric/MetricsCollectorFactory.java | 83 +++++++++++-------- pom.xml | 7 ++ start/pom.xml | 20 +++++ .../src/main/resources/application.properties | 5 +- .../connect-eventbridge-transform/pom.xml | 2 +- .../eventbridge/EventBridgeTransform.java | 25 ++++++ supports/connect-filter-transform/pom.xml | 2 +- .../EventBridgeFilterTransform.java | 15 ++++ test/rocketmq-eventbridge-e2etest/pom.xml | 12 +++ .../e2etest/controller/PostJsonExample.java | 42 ++++++++++ 18 files changed, 306 insertions(+), 49 deletions(-) create mode 100644 test/rocketmq-eventbridge-e2etest/src/test/java/org/apache/rocketmq/eventbridge/e2etest/controller/PostJsonExample.java diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java index 477e4b4e..2644308b 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java @@ -17,7 +17,6 @@ package org.apache.rocketmq.eventbridge.adapter.runtime.boot; -import com.alibaba.fastjson.JSON; import com.google.common.collect.Lists; import io.openmessaging.connector.api.data.ConnectRecord; import java.util.List; @@ -25,6 +24,7 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicReference; import javax.annotation.PostConstruct; import org.apache.commons.collections.MapUtils; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorContext; @@ -33,6 +33,8 @@ import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread; import org.apache.rocketmq.eventbridge.adapter.runtime.error.ErrorHandler; import org.apache.rocketmq.eventbridge.adapter.runtime.utils.ExceptionUtil; +import org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant; +import org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,9 +92,11 @@ public void run() { TransformEngine curTransformEngine = latestTransformMap.get(runnerName); List curEventRecords = eventRecordMap.get(runnerName); curEventRecords.forEach(pullRecord -> { + AtomicReference resultException = new AtomicReference<>(); CompletableFuture transformFuture = CompletableFuture.supplyAsync(() -> curTransformEngine.doTransforms(pullRecord)) .exceptionally((exception) -> { LOGGER.error("transfer do transform event record failed, stackTrace-", exception); + resultException.set(exception); errorHandler.handle(pullRecord, exception); return null; }) @@ -103,6 +107,7 @@ public void run() { offsetManager.commit(pullRecord); } }); + exportMetrics(pullRecord, runnerName, resultException); completableFutures.add(transformFuture); }); } @@ -117,6 +122,33 @@ public void run() { } } + private static void exportMetrics(ConnectRecord connectRecord, String ruleName, AtomicReference resultException) { + String status = "success"; + if (resultException.get() != null) { + status = "failed"; + resultException.set(null); + } + EventBridgeMetricsManager.observableDoubleGauge.set(1D, + EventBridgeMetricsManager.newAttributesBuilder() + .put(EventBridgeMetricsConstant.LABEL_STATUS, status) + .put(EventBridgeMetricsConstant.LABEL_EVENT_BUS_NAME, connectRecord.getExtension("eventbusname")) + .put(EventBridgeMetricsConstant.LABEL_ACCOUNT_ID, connectRecord.getExtension("id")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_SOURCE, connectRecord.getExtension("source")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_TYPE, connectRecord.getExtension("type")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_RULE_NAME, ruleName).build()); + + EventBridgeMetricsManager.eventbridgeEventRuleLagEventsTotal.inc(1L, + EventBridgeMetricsManager.newAttributesBuilder() + .put(EventBridgeMetricsConstant.LABEL_STATUS, status) + .put(EventBridgeMetricsConstant.LABEL_EVENT_BUS_NAME, connectRecord.getExtension("eventbusname")) + .put(EventBridgeMetricsConstant.LABEL_ACCOUNT_ID, connectRecord.getExtension("id")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_SOURCE, connectRecord.getExtension("source")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_TYPE, connectRecord.getExtension("type")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_RULE_NAME, ruleName).build()); + } + + + @Override public void start() { thread.start(); diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java index b9d175b9..fcda2112 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java @@ -31,6 +31,8 @@ import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread; import org.apache.rocketmq.eventbridge.adapter.runtime.error.ErrorHandler; import org.apache.rocketmq.eventbridge.adapter.runtime.utils.ExceptionUtil; +import org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant; +import org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,15 +76,39 @@ public void run() { try { sinkTask.put(triggerRecords); offsetManager.commit(triggerRecords); + exportMetrics(triggerRecords.get(0), "success"); } catch (Exception exception) { LOGGER.error(getServiceName() + " push target exception, stackTrace-", exception); triggerRecords.forEach(triggerRecord -> errorHandler.handle(triggerRecord, exception)); + exportMetrics(triggerRecords.get(0), "failed"); } }); } } } + private static void exportMetrics(ConnectRecord connectRecord, String status) { + EventBridgeMetricsManager.eventbridgeEventsTriggerLatency.update(1D, + EventBridgeMetricsManager.newAttributesBuilder() + .put(EventBridgeMetricsConstant.LABEL_STATUS, "success") + .put(EventBridgeMetricsConstant.LABEL_EVENT_BUS_NAME, connectRecord.getExtension("eventbusname")) + .put(EventBridgeMetricsConstant.LABEL_ACCOUNT_ID, connectRecord.getExtension("id")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_SOURCE, connectRecord.getExtension("source")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_TYPE, connectRecord.getExtension("type")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_RULE_NAME, connectRecord.getExtension("runner-name")) + .put(EventBridgeMetricsConstant.LABEL_TRANSFORM_TYPE, "filter").build()); + + EventBridgeMetricsManager.eventbridgeEventsLatency.set(1D, + EventBridgeMetricsManager.newAttributesBuilder() + .put(EventBridgeMetricsConstant.LABEL_STATUS, "success") + .put(EventBridgeMetricsConstant.LABEL_EVENT_BUS_NAME, connectRecord.getExtension("eventbusname")) + .put(EventBridgeMetricsConstant.LABEL_ACCOUNT_ID, connectRecord.getExtension("id")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_SOURCE, connectRecord.getExtension("source")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_TYPE, connectRecord.getExtension("type")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_RULE_NAME, connectRecord.getExtension("runner-name")) + .put(EventBridgeMetricsConstant.LABEL_TRANSFORM_TYPE, "filter").build()); + } + @Override public String getServiceName() { return EventTargetTrigger.class.getSimpleName(); diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/DefaultSendCallback.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/DefaultSendCallback.java index 35bc7ef8..892aa6b5 100644 --- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/DefaultSendCallback.java +++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/DefaultSendCallback.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.eventbridge.adapter.storage.rocketmq.impl; +import lombok.Getter; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.eventbridge.domain.model.data.PutEventCallback; @@ -29,6 +30,9 @@ public class DefaultSendCallback implements SendCallback { PutEventsResponseEntry entry = new PutEventsResponseEntry(); + @Getter + private String status; + public DefaultSendCallback(PutEventCallback putEventCallback) { this.putEventCallback = putEventCallback; } @@ -37,6 +41,7 @@ public DefaultSendCallback(PutEventCallback putEventCallback) { public void onSuccess(SendResult sendResult) { entry.setEventId(sendResult.getMsgId()); entry.setErrorCode(DefaultErrorCode.Success.getCode()); + status = DefaultErrorCode.Success.getCode(); putEventCallback.endProcess(entry); } @@ -44,6 +49,7 @@ public void onSuccess(SendResult sendResult) { public void onException(Throwable throwable) { entry.setErrorCode(DefaultErrorCode.InternalError.getCode()); entry.setErrorMessage(throwable.getMessage()); + status = DefaultErrorCode.InternalError.getCode(); putEventCallback.endProcess(entry); } } diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java index 5b1d796c..3e8b12b3 100644 --- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java +++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java @@ -30,6 +30,8 @@ import org.apache.rocketmq.eventbridge.domain.storage.EventDataRepository; import org.apache.rocketmq.eventbridge.event.EventBridgeEvent; import org.apache.rocketmq.eventbridge.exception.EventBridgeException; +import org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant; +import org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsManager; import org.springframework.beans.factory.annotation.Value; import org.springframework.cache.annotation.Cacheable; import org.springframework.stereotype.Repository; @@ -74,14 +76,34 @@ public boolean putEvent(String accountId, String eventBusName, EventBridgeEvent PutEventCallback putEventCallback) { String topicName = this.getTopicName(accountId, eventBusName); Message msg = eventDataOnRocketMQConnectAPI.converter(accountId, topicName, eventBridgeEvent); + DefaultSendCallback sendCallback = new DefaultSendCallback(putEventCallback); try { - producer.send(msg, new DefaultSendCallback(putEventCallback), 1000L); + producer.send(msg, sendCallback, 1000L); + exportMetrics(accountId, eventBusName, eventBridgeEvent, sendCallback); } catch (Throwable e) { throw new EventBridgeException(EventBridgeErrorCode.InternalError, e); } return true; } + private static void exportMetrics(String accountId, String eventBusName, EventBridgeEvent eventBridgeEvent, DefaultSendCallback sendCallback) { + EventBridgeMetricsManager.eventbridgePutEventsLatency.update(1D, + EventBridgeMetricsManager.newAttributesBuilder() + .put(EventBridgeMetricsConstant.LABEL_STATUS, sendCallback.getStatus()) + .put(EventBridgeMetricsConstant.LABEL_EVENT_BUS_NAME, eventBusName) + .put(EventBridgeMetricsConstant.LABEL_ACCOUNT_ID, accountId) + .put(EventBridgeMetricsConstant.LABEL_EVENT_SOURCE, eventBridgeEvent.getSource().toString()) + .put(EventBridgeMetricsConstant.LABEL_EVENT_TYPE, eventBridgeEvent.getType()).build()); + + EventBridgeMetricsManager.eventbridgePutEventsSize.update(Double.valueOf(String.valueOf(eventBridgeEvent.getData().length)), + EventBridgeMetricsManager.newAttributesBuilder() + .put(EventBridgeMetricsConstant.LABEL_STATUS, sendCallback.getStatus()) + .put(EventBridgeMetricsConstant.LABEL_EVENT_BUS_NAME, eventBusName) + .put(EventBridgeMetricsConstant.LABEL_ACCOUNT_ID, accountId) + .put(EventBridgeMetricsConstant.LABEL_EVENT_SOURCE, eventBridgeEvent.getSource().toString()) + .put(EventBridgeMetricsConstant.LABEL_EVENT_TYPE, eventBridgeEvent.getType()).build()); + } + @Override public String getEventBusPersistentContext(String accountId, String eventBusName) { EventTopicDO eventTopicDO = eventTopicMapper.getTopic(accountId, eventBusName); diff --git a/infrastructure/pom.xml b/infrastructure/pom.xml index 591fb254..a50a9d91 100644 --- a/infrastructure/pom.xml +++ b/infrastructure/pom.xml @@ -56,6 +56,10 @@ io.opentelemetry opentelemetry-exporter-logging-otlp + + io.prometheus + simpleclient + \ No newline at end of file diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/EventBridgeMetricsConstant.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/EventBridgeMetricsConstant.java index 4df3d377..8d74de26 100644 --- a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/EventBridgeMetricsConstant.java +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/EventBridgeMetricsConstant.java @@ -19,6 +19,24 @@ public class EventBridgeMetricsConstant { + public static final String HISTOGRAM_EVENTBRIDGE_PUTEVENTS_LATENCY = "eventbridge_putevents_latency"; + + public static final String HISTOGRAM_EVENTBRIDGE_PUTEVENTS_SIZE = "eventbridge_putevents_size"; + + public static final String GAUGE_EVENTBRIDGE_EVENT_RULE_LATENCY = "eventbridge_event_rule_latency"; + + public static final String COUNTER_EVENTBRIDGE_EVENT_RULE_LAG_EVENTS_TOTAL = "eventbridge_event_rule_lag_events_total"; + + public static final String COUNTER_EVENTBRIDGE_EVENTS_TRANSFER_IN_TOTAL = "eventbridge_events_transfer_in_total"; + + public static final String COUNTER_EVENTBRIDGE_EVENTS_TRANSFER_OUT_TOTAL = "eventbridge_events_transfer_out_total"; + + public static final String GAUGE_EVENTBRIDGE_EVENTS_TRANSFER_LATENCY = "eventbridge_events_transfer_latency"; + + public static final String HISTOGRAM_EVENTBRIDGE_EVENTS_TRIGGER_LATENCY = "eventbridge_events_trigger_latency"; + + public static final String GAUGE_EVENTBRIDGE_EVENTS_LATENCY = "eventbridge_events_latency"; + //状态 public static final String LABEL_STATUS = "status"; diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/EventBridgeMetricsManager.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/EventBridgeMetricsManager.java index d9bee7cb..988a0b00 100644 --- a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/EventBridgeMetricsManager.java +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/EventBridgeMetricsManager.java @@ -27,6 +27,16 @@ import java.util.Map; import java.util.function.Supplier; +import static org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant.COUNTER_EVENTBRIDGE_EVENTS_TRANSFER_IN_TOTAL; +import static org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant.COUNTER_EVENTBRIDGE_EVENTS_TRANSFER_OUT_TOTAL; +import static org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant.COUNTER_EVENTBRIDGE_EVENT_RULE_LAG_EVENTS_TOTAL; +import static org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant.GAUGE_EVENTBRIDGE_EVENTS_LATENCY; +import static org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant.GAUGE_EVENTBRIDGE_EVENTS_TRANSFER_LATENCY; +import static org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant.GAUGE_EVENTBRIDGE_EVENT_RULE_LATENCY; +import static org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant.HISTOGRAM_EVENTBRIDGE_EVENTS_TRIGGER_LATENCY; +import static org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant.HISTOGRAM_EVENTBRIDGE_PUTEVENTS_LATENCY; +import static org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant.HISTOGRAM_EVENTBRIDGE_PUTEVENTS_SIZE; + @UtilityClass public class EventBridgeMetricsManager { @@ -34,23 +44,23 @@ public class EventBridgeMetricsManager { public final static List metrics = new ArrayList<>(); - public static Histogram eventbridgePutEventsLatency = new DoubleHistogram("eventbridge_putevents_latency"); + public static Histogram eventbridgePutEventsLatency = new DoubleHistogram(HISTOGRAM_EVENTBRIDGE_PUTEVENTS_LATENCY); - public static Histogram eventbridgePutEventsSize = new DoubleHistogram("eventbridge_putevents_size"); + public static Histogram eventbridgePutEventsSize = new DoubleHistogram(HISTOGRAM_EVENTBRIDGE_PUTEVENTS_SIZE); - public static ObservableGauge observableDoubleGauge = new DoubleObserverGauge("eventbridge_event_rule_latency"); + public static ObservableGauge observableDoubleGauge = new DoubleObserverGauge(GAUGE_EVENTBRIDGE_EVENT_RULE_LATENCY); - public static Counter eventbridgeEventRuleLagEventsTotal = new LongCounter("eventbridge_event_rule_lag_events_total"); + public static Counter eventbridgeEventRuleLagEventsTotal = new LongCounter(COUNTER_EVENTBRIDGE_EVENT_RULE_LAG_EVENTS_TOTAL); - public static Counter eventbridgeEventsTransferInTotal = new LongCounter("eventbridge_events_transfer_in_total"); + public static Counter eventbridgeEventsTransferInTotal = new LongCounter(COUNTER_EVENTBRIDGE_EVENTS_TRANSFER_IN_TOTAL); - public static Counter eventbridgeEventsTransferOutTotal = new LongCounter("eventbridge_events_transfer_out_total"); + public static Counter eventbridgeEventsTransferOutTotal = new LongCounter(COUNTER_EVENTBRIDGE_EVENTS_TRANSFER_OUT_TOTAL); - public static ObservableGauge eventbridgeEventsTransferLatency = new DoubleObserverGauge("eventbridge_events_transfer_latency"); + public static ObservableGauge eventbridgeEventsTransferLatency = new DoubleObserverGauge(GAUGE_EVENTBRIDGE_EVENTS_TRANSFER_LATENCY); - public static Histogram eventbridgeEventsTriggerLatency = new DoubleHistogram("eventbridge_events_trigger_latency"); + public static Histogram eventbridgeEventsTriggerLatency = new DoubleHistogram(HISTOGRAM_EVENTBRIDGE_EVENTS_TRIGGER_LATENCY); - public static ObservableGauge eventbridgeEventsLatency = new DoubleObserverGauge("eventbridge_events_latency"); + public static ObservableGauge eventbridgeEventsLatency = new DoubleObserverGauge(GAUGE_EVENTBRIDGE_EVENTS_LATENCY); private final static Map LABEL_MAP = new HashMap<>(); diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricConfig.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricConfig.java index 2e3eb1a6..c613c14e 100644 --- a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricConfig.java +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricConfig.java @@ -46,6 +46,8 @@ public class MetricConfig { private long loggingExporterIntervalInMills = 10 * 1000; + private int metricsOtelCardinalityLimit = 50 * 1000; + private int promExporterPort = 5557; private String promExporterHost = "localhost"; diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricsCollectorFactory.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricsCollectorFactory.java index 11337278..15868480 100644 --- a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricsCollectorFactory.java +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/metric/MetricsCollectorFactory.java @@ -30,9 +30,11 @@ import io.opentelemetry.sdk.metrics.SdkMeterProvider; import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder; import io.opentelemetry.sdk.metrics.View; +import io.opentelemetry.sdk.metrics.ViewBuilder; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.export.MetricExporter; import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; +import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil; import io.opentelemetry.sdk.resources.Resource; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -46,22 +48,13 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Logger; +import static org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant.HISTOGRAM_EVENTBRIDGE_EVENTS_TRIGGER_LATENCY; +import static org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant.HISTOGRAM_EVENTBRIDGE_PUTEVENTS_LATENCY; +import static org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant.HISTOGRAM_EVENTBRIDGE_PUTEVENTS_SIZE; + @Slf4j public class MetricsCollectorFactory { - public static final String OPEN_TELEMETRY_METER_NAME = "eventbridge-meter"; - - public static final String HISTOGRAM_MESSAGE_SIZE = "eventbridge_message_size"; - - public static final String EVENTBUS_IN_EVENTS_TOTAL = "eventbridge_eventbus_in_events_total"; - - public static final String EVENTRULE_FILTER_EVENTS_TOTAL = "eventbridge_eventrule_filter_events_total"; - - public static final String EVENTRULE_LATENCY_SECONDS = "eventbridge_eventrule_latency_seconds"; - - public static final String EVENTRULE_TRIGGER_LATENCY = "eventbridge_eventrule_trigger_latency"; - - - private final static Map LABEL_MAP = new HashMap<>(); + private static final String OPEN_TELEMETRY_METER_NAME = "eventbridge-meter"; private static class MetricsCollectorHolder{ static MetricsCollectorFactory instance = new MetricsCollectorFactory(); @@ -152,7 +145,7 @@ public void start(MetricConfig metricConfig) { providerBuilder.registerMetricReader(periodicMetricReader); } - registerMetricsView(providerBuilder); + registerMetricsView(providerBuilder, metricConfig); Meter brokerMeter = OpenTelemetrySdk.builder() .setMeterProvider(providerBuilder.build()) @@ -181,7 +174,28 @@ public static boolean checkConfig(MetricConfig metricConfig) { return false; } - private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) { + private void registerMetricsView(SdkMeterProviderBuilder providerBuilder, MetricConfig metricConfig) { + + //putevents latency buckets + List puteventsLatencyBuckets = Arrays.asList( + (double) Duration.ofMillis(1).toMillis(), + (double) Duration.ofMillis(5).toMillis(), + (double) Duration.ofMillis(20).toMillis(), + (double) Duration.ofMillis(100).toMillis(), + (double) Duration.ofMillis(1000).toMillis(), + (double) Duration.ofMillis(5000).toMillis(), + (double) Duration.ofSeconds(10000).toMillis() + ); + InstrumentSelector puteventsLatencyBucketsSelector = InstrumentSelector.builder() + .setType(InstrumentType.HISTOGRAM) + .setName(HISTOGRAM_EVENTBRIDGE_PUTEVENTS_LATENCY) + .build(); + ViewBuilder puteventsLatencyBucketsView = View.builder() + .setAggregation(Aggregation.explicitBucketHistogram(puteventsLatencyBuckets)); + SdkMeterProviderUtil.setCardinalityLimit(puteventsLatencyBucketsView, metricConfig.getMetricsOtelCardinalityLimit()); + providerBuilder.registerView(puteventsLatencyBucketsSelector, puteventsLatencyBucketsView.build()); + + // message size buckets, 1k, 4k, 512k, 1M, 2M, 4M List messageSizeBuckets = Arrays.asList( 1d * 1024, //1KB @@ -191,36 +205,35 @@ private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) { 2d * 1024 * 1024, //2MB 4d * 1024 * 1024 //4MB ); + InstrumentSelector messageSizeSelector = InstrumentSelector.builder() .setType(InstrumentType.HISTOGRAM) - .setName(HISTOGRAM_MESSAGE_SIZE) + .setName(HISTOGRAM_EVENTBRIDGE_PUTEVENTS_SIZE) .build(); - View messageSizeView = View.builder() - .setAggregation(Aggregation.explicitBucketHistogram(messageSizeBuckets)) - .build(); - providerBuilder.registerView(messageSizeSelector, messageSizeView); + ViewBuilder messageSizeView = View.builder() + .setAggregation(Aggregation.explicitBucketHistogram(messageSizeBuckets)); + SdkMeterProviderUtil.setCardinalityLimit(messageSizeView, metricConfig.getMetricsOtelCardinalityLimit()); + providerBuilder.registerView(messageSizeSelector, messageSizeView.build()); - List rpcCostTimeBuckets = Arrays.asList( + //events trigger latency + List eventsTriggerLatencyBuckets = Arrays.asList( (double) Duration.ofMillis(1).toMillis(), - (double) Duration.ofMillis(3).toMillis(), (double) Duration.ofMillis(5).toMillis(), - (double) Duration.ofMillis(7).toMillis(), - (double) Duration.ofMillis(10).toMillis(), + (double) Duration.ofMillis(20).toMillis(), (double) Duration.ofMillis(100).toMillis(), - (double) Duration.ofSeconds(1).toMillis(), - (double) Duration.ofSeconds(2).toMillis(), - (double) Duration.ofSeconds(3).toMillis() + (double) Duration.ofMillis(1000).toMillis(), + (double) Duration.ofMillis(5000).toMillis(), + (double) Duration.ofSeconds(10000).toMillis() ); - InstrumentSelector selector = InstrumentSelector.builder() + InstrumentSelector eventsTriggerLatencySelector = InstrumentSelector.builder() .setType(InstrumentType.HISTOGRAM) - .setName(EVENTRULE_TRIGGER_LATENCY) + .setName(HISTOGRAM_EVENTBRIDGE_EVENTS_TRIGGER_LATENCY) .build(); - View view = View.builder() - .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets)) - .build(); - - providerBuilder.registerView(selector, view); + ViewBuilder eventsTriggerLatencyView = View.builder() + .setAggregation(Aggregation.explicitBucketHistogram(eventsTriggerLatencyBuckets)); + SdkMeterProviderUtil.setCardinalityLimit(eventsTriggerLatencyView, metricConfig.getMetricsOtelCardinalityLimit()); + providerBuilder.registerView(eventsTriggerLatencySelector, eventsTriggerLatencyView.build()); } } diff --git a/pom.xml b/pom.xml index 8df75ef6..b2ccb1b2 100644 --- a/pom.xml +++ b/pom.xml @@ -99,6 +99,7 @@ 1.18.20 1.39.0 1.39.0-alpha + 0.16.0 @@ -357,6 +358,12 @@ ${opentelemetry.exporter.prometheus.version} + + io.prometheus + simpleclient + ${simpleclient.version} + + junit diff --git a/start/pom.xml b/start/pom.xml index 544a9689..bf56c396 100644 --- a/start/pom.xml +++ b/start/pom.xml @@ -88,6 +88,26 @@ com.alibaba fastjson + + org.apache.rocketmq + connect-eventbridge-transform + 1.1.0 + + + org.apache.rocketmq + connect-eventbridge-transform + 1.1.0 + + + org.apache.rocketmq + connect-filter-transform + 1.1.0 + + + org.apache.rocketmq + eventbridge-connect-file + 1.1.0 + diff --git a/start/src/main/resources/application.properties b/start/src/main/resources/application.properties index 8daf91c5..408b9f37 100644 --- a/start/src/main/resources/application.properties +++ b/start/src/main/resources/application.properties @@ -38,4 +38,7 @@ runtime.pluginpath=./plugin ## log app.name=rocketmqeventbridge log.level=INFO -log.path=~/logs \ No newline at end of file +log.path=~/logs + +## metrics +metrics.metrics-exporter-type=PROM \ No newline at end of file diff --git a/supports/connect-eventbridge-transform/pom.xml b/supports/connect-eventbridge-transform/pom.xml index 36b78b8e..358fa405 100644 --- a/supports/connect-eventbridge-transform/pom.xml +++ b/supports/connect-eventbridge-transform/pom.xml @@ -40,7 +40,7 @@ org.apache.rocketmq - rocketmq-eventbridge-common + rocketmq-eventbridge-infrastructure 1.1.0 diff --git a/supports/connect-eventbridge-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeTransform.java b/supports/connect-eventbridge-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeTransform.java index 3b140212..136d602a 100644 --- a/supports/connect-eventbridge-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeTransform.java +++ b/supports/connect-eventbridge-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeTransform.java @@ -22,6 +22,8 @@ import io.openmessaging.connector.api.component.ComponentContext; import io.openmessaging.connector.api.data.ConnectRecord; import io.openmessaging.connector.api.data.SchemaBuilder; +import org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant; +import org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsManager; import org.apache.rocketmq.eventbridge.tools.transform.*; import java.util.Map; @@ -46,9 +48,32 @@ public ConnectRecord doTransform(ConnectRecord record) { record.addExtension(entry.getKey(), ((StringData) data).getData()); } }); + exportMetrics(record); return record; } + private static void exportMetrics(ConnectRecord connectRecord) { + EventBridgeMetricsManager.eventbridgeEventsTransferOutTotal.inc(1L, + EventBridgeMetricsManager.newAttributesBuilder() + .put(EventBridgeMetricsConstant.LABEL_STATUS, "success") + .put(EventBridgeMetricsConstant.LABEL_EVENT_BUS_NAME, connectRecord.getExtension("eventbusname")) + .put(EventBridgeMetricsConstant.LABEL_ACCOUNT_ID, connectRecord.getExtension("id")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_SOURCE, connectRecord.getExtension("source")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_TYPE, connectRecord.getExtension("type")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_RULE_NAME, connectRecord.getExtension("runner-name")) + .put(EventBridgeMetricsConstant.LABEL_TRANSFORM_TYPE, "filter").build()); + + EventBridgeMetricsManager.eventbridgeEventsTransferLatency.set(1D, + EventBridgeMetricsManager.newAttributesBuilder() + .put(EventBridgeMetricsConstant.LABEL_STATUS, "success") + .put(EventBridgeMetricsConstant.LABEL_EVENT_BUS_NAME, connectRecord.getExtension("eventbusname")) + .put(EventBridgeMetricsConstant.LABEL_ACCOUNT_ID, connectRecord.getExtension("id")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_SOURCE, connectRecord.getExtension("source")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_TYPE, connectRecord.getExtension("type")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_RULE_NAME, connectRecord.getExtension("runner-name")) + .put(EventBridgeMetricsConstant.LABEL_TRANSFORM_TYPE, "filter").build()); + } + @Override public void validate(KeyValue config) { diff --git a/supports/connect-filter-transform/pom.xml b/supports/connect-filter-transform/pom.xml index 1ea72933..e565de42 100644 --- a/supports/connect-filter-transform/pom.xml +++ b/supports/connect-filter-transform/pom.xml @@ -47,7 +47,7 @@ org.apache.rocketmq - rocketmq-eventbridge-common + rocketmq-eventbridge-infrastructure ${project.version} diff --git a/supports/connect-filter-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransform.java b/supports/connect-filter-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransform.java index 7105fa7a..fd6b5537 100644 --- a/supports/connect-filter-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransform.java +++ b/supports/connect-filter-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransform.java @@ -23,6 +23,8 @@ import io.openmessaging.KeyValue; import io.openmessaging.connector.api.component.ComponentContext; import io.openmessaging.connector.api.data.ConnectRecord; +import org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant; +import org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsManager; import org.apache.rocketmq.eventbridge.tools.pattern.PatternEvaluator; import org.apache.rocketmq.eventbridge.tools.pattern.PatternEvaluatorBuilder; @@ -34,6 +36,7 @@ public class EventBridgeFilterTransform implements io.openmessaging.connector.ap @Override public ConnectRecord doTransform(ConnectRecord record) { + exportMetrics(record); if (!evaluator.evaluateData(new Gson().toJson(record.getData()))) { return null; } else if (!evaluator.evaluateSpecAttr(this.buildSpecAttr(record))) { @@ -45,6 +48,18 @@ public ConnectRecord doTransform(ConnectRecord record) { } } + private static void exportMetrics(ConnectRecord connectRecord) { + EventBridgeMetricsManager.eventbridgeEventsTransferInTotal.inc(1L, + EventBridgeMetricsManager.newAttributesBuilder() + .put(EventBridgeMetricsConstant.LABEL_STATUS, "success") + .put(EventBridgeMetricsConstant.LABEL_EVENT_BUS_NAME, connectRecord.getExtension("eventbusname")) + .put(EventBridgeMetricsConstant.LABEL_ACCOUNT_ID, connectRecord.getExtension("id")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_SOURCE, connectRecord.getExtension("source")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_TYPE, connectRecord.getExtension("type")) + .put(EventBridgeMetricsConstant.LABEL_EVENT_RULE_NAME, connectRecord.getExtension("runner-name")) + .put(EventBridgeMetricsConstant.LABEL_TRANSFORM_TYPE, "filter").build()); + } + private Map buildSpecAttr(ConnectRecord record) { Map extensionsAttrs = Maps.newHashMap(); SpecVersion.V1.getAllAttributes() diff --git a/test/rocketmq-eventbridge-e2etest/pom.xml b/test/rocketmq-eventbridge-e2etest/pom.xml index dab2c2c2..d994f6c7 100644 --- a/test/rocketmq-eventbridge-e2etest/pom.xml +++ b/test/rocketmq-eventbridge-e2etest/pom.xml @@ -27,6 +27,16 @@ + + org.jetbrains.kotlin + kotlin-stdlib + 1.3.70 + + + com.squareup.okhttp3 + okhttp + 4.2.0 + org.apache.rocketmq rocketmq-eventbridge-start @@ -67,11 +77,13 @@ connect-filter-transform 1.1.0 + org.apache.rocketmq eventbridge-connect-file 1.1.0 + \ No newline at end of file diff --git a/test/rocketmq-eventbridge-e2etest/src/test/java/org/apache/rocketmq/eventbridge/e2etest/controller/PostJsonExample.java b/test/rocketmq-eventbridge-e2etest/src/test/java/org/apache/rocketmq/eventbridge/e2etest/controller/PostJsonExample.java new file mode 100644 index 00000000..16327bbb --- /dev/null +++ b/test/rocketmq-eventbridge-e2etest/src/test/java/org/apache/rocketmq/eventbridge/e2etest/controller/PostJsonExample.java @@ -0,0 +1,42 @@ +package org.apache.rocketmq.eventbridge.e2etest.controller; + +import okhttp3.*; + +import java.io.IOException; + +public class PostJsonExample { + public static void main(String[] args) throws InterruptedException { + + // 构建 JSON 请求体 + + OkHttpClient client = new OkHttpClient(); + RequestBody body = RequestBody.create(MediaType.parse("application/json"), "A test recrod."); + + // 创建 POST 请求 + Request request = new Request.Builder() + .url("http://localhost:7001/putEvents") + .addHeader("Content-Type", "application/json") + .addHeader("ce-specversion","1.0") + .addHeader("ce-type", "com.github.pull_request.opened") + .addHeader("ce-source", "https://github.com/cloudevents/spec/pull") + .addHeader("ce-subject", "demo") + .addHeader("ce-id", "1234-1234-1234") + .addHeader("ce-datacontenttype", "application/json") + .addHeader("ce-time","2018-04-05T17:31:00Z") + .addHeader("ce-eventbusname", "demo-bus") + .post(body) + .build(); + + // 发送同步 POST 请求 + try (Response response = client.newCall(request).execute()) { + if (response.isSuccessful()) { + System.out.println(response.body().string()); + } else { + System.err.println("Request failed: " + response.code()); + } + } catch (IOException e) { + e.printStackTrace(); + } + } +} +