Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@

package org.apache.rocketmq.eventbridge.adapter.runtime.boot;

import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import io.openmessaging.connector.api.data.ConnectRecord;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.PostConstruct;
import org.apache.commons.collections.MapUtils;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorContext;
Expand All @@ -33,6 +33,8 @@
import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread;
import org.apache.rocketmq.eventbridge.adapter.runtime.error.ErrorHandler;
import org.apache.rocketmq.eventbridge.adapter.runtime.utils.ExceptionUtil;
import org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant;
import org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -90,9 +92,11 @@ public void run() {
TransformEngine<ConnectRecord> curTransformEngine = latestTransformMap.get(runnerName);
List<ConnectRecord> curEventRecords = eventRecordMap.get(runnerName);
curEventRecords.forEach(pullRecord -> {
AtomicReference<Throwable> resultException = new AtomicReference<>();
CompletableFuture<Void> transformFuture = CompletableFuture.supplyAsync(() -> curTransformEngine.doTransforms(pullRecord))
.exceptionally((exception) -> {
LOGGER.error("transfer do transform event record failed, stackTrace-", exception);
resultException.set(exception);
errorHandler.handle(pullRecord, exception);
return null;
})
Expand All @@ -103,6 +107,7 @@ public void run() {
offsetManager.commit(pullRecord);
}
});
exportMetrics(pullRecord, runnerName, resultException);
completableFutures.add(transformFuture);
});
}
Expand All @@ -117,6 +122,33 @@ public void run() {
}
}

private static void exportMetrics(ConnectRecord connectRecord, String ruleName, AtomicReference<Throwable> resultException) {
String status = "success";
if (resultException.get() != null) {
status = "failed";
resultException.set(null);
}
EventBridgeMetricsManager.observableDoubleGauge.set(1D,
EventBridgeMetricsManager.newAttributesBuilder()
.put(EventBridgeMetricsConstant.LABEL_STATUS, status)
.put(EventBridgeMetricsConstant.LABEL_EVENT_BUS_NAME, connectRecord.getExtension("eventbusname"))
.put(EventBridgeMetricsConstant.LABEL_ACCOUNT_ID, connectRecord.getExtension("id"))
.put(EventBridgeMetricsConstant.LABEL_EVENT_SOURCE, connectRecord.getExtension("source"))
.put(EventBridgeMetricsConstant.LABEL_EVENT_TYPE, connectRecord.getExtension("type"))
.put(EventBridgeMetricsConstant.LABEL_EVENT_RULE_NAME, ruleName).build());

EventBridgeMetricsManager.eventbridgeEventRuleLagEventsTotal.inc(1L,
EventBridgeMetricsManager.newAttributesBuilder()
.put(EventBridgeMetricsConstant.LABEL_STATUS, status)
.put(EventBridgeMetricsConstant.LABEL_EVENT_BUS_NAME, connectRecord.getExtension("eventbusname"))
.put(EventBridgeMetricsConstant.LABEL_ACCOUNT_ID, connectRecord.getExtension("id"))
.put(EventBridgeMetricsConstant.LABEL_EVENT_SOURCE, connectRecord.getExtension("source"))
.put(EventBridgeMetricsConstant.LABEL_EVENT_TYPE, connectRecord.getExtension("type"))
.put(EventBridgeMetricsConstant.LABEL_EVENT_RULE_NAME, ruleName).build());
}



@Override
public void start() {
thread.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread;
import org.apache.rocketmq.eventbridge.adapter.runtime.error.ErrorHandler;
import org.apache.rocketmq.eventbridge.adapter.runtime.utils.ExceptionUtil;
import org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant;
import org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -74,15 +76,39 @@ public void run() {
try {
sinkTask.put(triggerRecords);
offsetManager.commit(triggerRecords);
exportMetrics(triggerRecords.get(0), "success");
} catch (Exception exception) {
LOGGER.error(getServiceName() + " push target exception, stackTrace-", exception);
triggerRecords.forEach(triggerRecord -> errorHandler.handle(triggerRecord, exception));
exportMetrics(triggerRecords.get(0), "failed");
}
});
}
}
}

private static void exportMetrics(ConnectRecord connectRecord, String status) {
EventBridgeMetricsManager.eventbridgeEventsTriggerLatency.update(1D,
EventBridgeMetricsManager.newAttributesBuilder()
.put(EventBridgeMetricsConstant.LABEL_STATUS, "success")
.put(EventBridgeMetricsConstant.LABEL_EVENT_BUS_NAME, connectRecord.getExtension("eventbusname"))
.put(EventBridgeMetricsConstant.LABEL_ACCOUNT_ID, connectRecord.getExtension("id"))
.put(EventBridgeMetricsConstant.LABEL_EVENT_SOURCE, connectRecord.getExtension("source"))
.put(EventBridgeMetricsConstant.LABEL_EVENT_TYPE, connectRecord.getExtension("type"))
.put(EventBridgeMetricsConstant.LABEL_EVENT_RULE_NAME, connectRecord.getExtension("runner-name"))
.put(EventBridgeMetricsConstant.LABEL_TRANSFORM_TYPE, "filter").build());

EventBridgeMetricsManager.eventbridgeEventsLatency.set(1D,
EventBridgeMetricsManager.newAttributesBuilder()
.put(EventBridgeMetricsConstant.LABEL_STATUS, "success")
.put(EventBridgeMetricsConstant.LABEL_EVENT_BUS_NAME, connectRecord.getExtension("eventbusname"))
.put(EventBridgeMetricsConstant.LABEL_ACCOUNT_ID, connectRecord.getExtension("id"))
.put(EventBridgeMetricsConstant.LABEL_EVENT_SOURCE, connectRecord.getExtension("source"))
.put(EventBridgeMetricsConstant.LABEL_EVENT_TYPE, connectRecord.getExtension("type"))
.put(EventBridgeMetricsConstant.LABEL_EVENT_RULE_NAME, connectRecord.getExtension("runner-name"))
.put(EventBridgeMetricsConstant.LABEL_TRANSFORM_TYPE, "filter").build());
}

@Override
public String getServiceName() {
return EventTargetTrigger.class.getSimpleName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.rocketmq.eventbridge.adapter.storage.rocketmq.impl;

import lombok.Getter;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.eventbridge.domain.model.data.PutEventCallback;
Expand All @@ -29,6 +30,9 @@ public class DefaultSendCallback implements SendCallback {

PutEventsResponseEntry entry = new PutEventsResponseEntry();

@Getter
private String status;

public DefaultSendCallback(PutEventCallback putEventCallback) {
this.putEventCallback = putEventCallback;
}
Expand All @@ -37,13 +41,15 @@ public DefaultSendCallback(PutEventCallback putEventCallback) {
public void onSuccess(SendResult sendResult) {
entry.setEventId(sendResult.getMsgId());
entry.setErrorCode(DefaultErrorCode.Success.getCode());
status = DefaultErrorCode.Success.getCode();
putEventCallback.endProcess(entry);
}

@Override
public void onException(Throwable throwable) {
entry.setErrorCode(DefaultErrorCode.InternalError.getCode());
entry.setErrorMessage(throwable.getMessage());
status = DefaultErrorCode.InternalError.getCode();
putEventCallback.endProcess(entry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.rocketmq.eventbridge.domain.storage.EventDataRepository;
import org.apache.rocketmq.eventbridge.event.EventBridgeEvent;
import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
import org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsConstant;
import org.apache.rocketmq.eventbridge.infrastructure.metric.EventBridgeMetricsManager;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Repository;
Expand Down Expand Up @@ -74,14 +76,34 @@ public boolean putEvent(String accountId, String eventBusName, EventBridgeEvent
PutEventCallback putEventCallback) {
String topicName = this.getTopicName(accountId, eventBusName);
Message msg = eventDataOnRocketMQConnectAPI.converter(accountId, topicName, eventBridgeEvent);
DefaultSendCallback sendCallback = new DefaultSendCallback(putEventCallback);
try {
producer.send(msg, new DefaultSendCallback(putEventCallback), 1000L);
producer.send(msg, sendCallback, 1000L);
exportMetrics(accountId, eventBusName, eventBridgeEvent, sendCallback);
} catch (Throwable e) {
throw new EventBridgeException(EventBridgeErrorCode.InternalError, e);
}
return true;
}

private static void exportMetrics(String accountId, String eventBusName, EventBridgeEvent eventBridgeEvent, DefaultSendCallback sendCallback) {
EventBridgeMetricsManager.eventbridgePutEventsLatency.update(1D,
EventBridgeMetricsManager.newAttributesBuilder()
.put(EventBridgeMetricsConstant.LABEL_STATUS, sendCallback.getStatus())
.put(EventBridgeMetricsConstant.LABEL_EVENT_BUS_NAME, eventBusName)
.put(EventBridgeMetricsConstant.LABEL_ACCOUNT_ID, accountId)
.put(EventBridgeMetricsConstant.LABEL_EVENT_SOURCE, eventBridgeEvent.getSource().toString())
.put(EventBridgeMetricsConstant.LABEL_EVENT_TYPE, eventBridgeEvent.getType()).build());

EventBridgeMetricsManager.eventbridgePutEventsSize.update(Double.valueOf(String.valueOf(eventBridgeEvent.getData().length)),
EventBridgeMetricsManager.newAttributesBuilder()
.put(EventBridgeMetricsConstant.LABEL_STATUS, sendCallback.getStatus())
.put(EventBridgeMetricsConstant.LABEL_EVENT_BUS_NAME, eventBusName)
.put(EventBridgeMetricsConstant.LABEL_ACCOUNT_ID, accountId)
.put(EventBridgeMetricsConstant.LABEL_EVENT_SOURCE, eventBridgeEvent.getSource().toString())
.put(EventBridgeMetricsConstant.LABEL_EVENT_TYPE, eventBridgeEvent.getType()).build());
}

@Override
public String getEventBusPersistentContext(String accountId, String eventBusName) {
EventTopicDO eventTopicDO = eventTopicMapper.getTopic(accountId, eventBusName);
Expand Down
2 changes: 1 addition & 1 deletion domain/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<!-- Project Modules -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-eventbridge-common</artifactId>
<artifactId>rocketmq-eventbridge-infrastructure</artifactId>
</dependency>

<!-- Framework -->
Expand Down
27 changes: 27 additions & 0 deletions infrastructure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,33 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-otlp</artifactId>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-logging</artifactId>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-prometheus</artifactId>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-logging-otlp</artifactId>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


package org.apache.rocketmq.eventbridge.infrastructure.metric;


public interface Counter<P1, P2, R> extends Metric<R> {

default void inc(P2 attachment){}

default void inc(P1 n, P2 attachment){}

@Override
default MetricType getMetricType() {
return MetricType.COUNTER;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.eventbridge.infrastructure.metric;

import io.opentelemetry.api.common.Attributes;
import org.apache.rocketmq.eventbridge.infrastructure.metric.otlp.NopDoubleGauge;


public class DoubleGauge implements Gauge<Double, Attributes, io.opentelemetry.api.metrics.DoubleGauge> {

private io.opentelemetry.api.metrics.DoubleGauge doubleGauge = new NopDoubleGauge();

private final String metricName;

public DoubleGauge(String metricName) {
this.metricName = metricName;
}

@Override
public void set(Double value, Attributes attachment) {
doubleGauge.set(value, attachment);
}

@Override
public io.opentelemetry.api.metrics.DoubleGauge getValue() {
return doubleGauge;
}

@Override
public String getMetricName() {
return this.metricName;
}

@Override
public void setInstrument(io.opentelemetry.api.metrics.DoubleGauge instrument) {
this.doubleGauge = instrument;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.eventbridge.infrastructure.metric;

import io.opentelemetry.api.common.Attributes;
import org.apache.rocketmq.eventbridge.infrastructure.metric.otlp.NopDoubleHistogram;

public class DoubleHistogram implements Histogram<Double, Attributes, io.opentelemetry.api.metrics.DoubleHistogram> {

private io.opentelemetry.api.metrics.DoubleHistogram doubleHistogram = new NopDoubleHistogram();

private final String metricName;

public DoubleHistogram(String metricName) {
this.metricName = metricName;
}

@Override
public void update(Double value, Attributes attachment) {
doubleHistogram.record(value, attachment);
}

@Override
public io.opentelemetry.api.metrics.DoubleHistogram getValue() {
return doubleHistogram;
}

@Override
public String getMetricName() {
return this.metricName;
}

@Override
public void setInstrument(io.opentelemetry.api.metrics.DoubleHistogram instrument) {
this.doubleHistogram = instrument;
}
}
Loading