Skip to content
Open
6 changes: 6 additions & 0 deletions adapter/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@
<version>4.0.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq.eventbridge</groupId>
<artifactId>rocketmq-eventbridge-metrics</artifactId>
<version>1.0.0</version>
<scope>compile</scope>
</dependency>
</dependencies>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.rocketmq.eventbridge.adapter.runtime;

import org.apache.rocketmq.eventbridge.BridgeMetricsManager;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.EventBusListener;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.EventMonitor;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.EventRuleTransfer;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.EventTargetTrigger;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorContext;
Expand Down Expand Up @@ -69,9 +71,13 @@ public void initAndStart() throws Exception {
circulatorContext.initCirculatorContext(runnerConfigObserver.getTargetRunnerConfig());
runnerConfigObserver.registerListener(circulatorContext);
runnerConfigObserver.registerListener(eventSubscriber);
EventBusListener eventBusListener = new EventBusListener(circulatorContext, eventSubscriber, errorHandler);
EventRuleTransfer eventRuleTransfer = new EventRuleTransfer(circulatorContext, offsetManager, errorHandler);
EventTargetTrigger eventTargetPusher = new EventTargetTrigger(circulatorContext, offsetManager, errorHandler);

BridgeMetricsManager metricsManager = new BridgeMetricsManager();
EventMonitor eventMonitor = new EventMonitor(metricsManager);
EventBusListener eventBusListener = new EventBusListener(circulatorContext, eventSubscriber, errorHandler, metricsManager);
EventRuleTransfer eventRuleTransfer = new EventRuleTransfer(circulatorContext, offsetManager, errorHandler, metricsManager);
EventTargetTrigger eventTargetPusher = new EventTargetTrigger(circulatorContext, offsetManager, errorHandler, metricsManager);
RUNTIME_START_AND_SHUTDOWN.appendStartAndShutdown(eventMonitor);
RUNTIME_START_AND_SHUTDOWN.appendStartAndShutdown(eventBusListener);
RUNTIME_START_AND_SHUTDOWN.appendStartAndShutdown(eventRuleTransfer);
RUNTIME_START_AND_SHUTDOWN.appendStartAndShutdown(eventTargetPusher);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,21 @@

import com.google.common.collect.Lists;
import io.openmessaging.connector.api.data.ConnectRecord;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.eventbridge.BridgeMetricsConstant;
import org.apache.rocketmq.eventbridge.BridgeMetricsManager;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorContext;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.listener.EventSubscriber;
import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread;
import org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.TargetRunnerConfig;
import org.apache.rocketmq.eventbridge.adapter.runtime.config.RuntimeConfigDefine;
import org.apache.rocketmq.eventbridge.adapter.runtime.error.ErrorHandler;
import org.apache.rocketmq.eventbridge.adapter.runtime.utils.RunnerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,32 +49,46 @@ public class EventBusListener extends ServiceThread {
private final CirculatorContext circulatorContext;
private final EventSubscriber eventSubscriber;
private final ErrorHandler errorHandler;
private BridgeMetricsManager metricsManager;

public EventBusListener(CirculatorContext circulatorContext, EventSubscriber eventSubscriber,
ErrorHandler errorHandler) {
ErrorHandler errorHandler, BridgeMetricsManager metricsManager) {
this.circulatorContext = circulatorContext;
this.eventSubscriber = eventSubscriber;
this.errorHandler = errorHandler;
this.metricsManager = metricsManager;
}

@Override
public void run() {
while (!stopped) {
List<ConnectRecord> pullRecordList = Lists.newArrayList();
try {
pullRecordList = eventSubscriber.pull();
pullRecordList = Optional.ofNullable(eventSubscriber.pull()).orElse(new ArrayList<>());

for (ConnectRecord connectRecord : pullRecordList) {
String runnerName = connectRecord.getExtension(RuntimeConfigDefine.RUNNER_NAME);
String accountId = RunnerUtil.getAccountId(circulatorContext,runnerName);
metricsManager.eventbusInEventsTotal(runnerName, accountId, BridgeMetricsConstant.Status.SUCCESS.name(), 1);
}
if (CollectionUtils.isEmpty(pullRecordList)) {
this.waitForRunning(1000);
continue;
}
circulatorContext.offerEventRecords(pullRecordList);
} catch (Exception exception) {
logger.error(getServiceName() + " - event bus pull record exception, stackTrace - ", exception);
pullRecordList.forEach(pullRecord -> errorHandler.handle(pullRecord, exception));
pullRecordList.forEach(pullRecord -> {
String runnerName = pullRecord.getExtension(RuntimeConfigDefine.RUNNER_NAME);
String accountId = RunnerUtil.getAccountId(circulatorContext, pullRecord);
metricsManager.eventbusInEventsTotal(runnerName, accountId, BridgeMetricsConstant.Status.FAILED.name(), 1);
errorHandler.handle(pullRecord, exception);
});
}
}
}


@Override
public String getServiceName() {
return EventBusListener.class.getSimpleName();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.eventbridge.adapter.runtime.boot;

import org.apache.rocketmq.eventbridge.BridgeMetricsManager;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.listener.EventSubscriber;
import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread;

public class EventMonitor extends ServiceThread {

private BridgeMetricsManager bridgeMetricsManager;

public EventMonitor(BridgeMetricsManager metricsManager) {
this.bridgeMetricsManager = metricsManager;
}
@Override
public String getServiceName() {
return EventMonitor.class.getSimpleName();
}

@Override
public void run() {
bridgeMetricsManager.init();
}

@Override
public void shutdown() {
bridgeMetricsManager.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,19 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;

import javax.annotation.PostConstruct;
import org.apache.commons.collections.MapUtils;
import org.apache.rocketmq.eventbridge.BridgeMetricsConstant;
import org.apache.rocketmq.eventbridge.BridgeMetricsManager;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorContext;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.OffsetManager;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.transfer.TransformEngine;
import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread;
import org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.TargetRunnerConfig;
import org.apache.rocketmq.eventbridge.adapter.runtime.error.ErrorHandler;
import org.apache.rocketmq.eventbridge.adapter.runtime.utils.ExceptionUtil;
import org.apache.rocketmq.eventbridge.adapter.runtime.utils.RunnerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -48,12 +53,14 @@ public class EventRuleTransfer extends ServiceThread {
private final CirculatorContext circulatorContext;
private final OffsetManager offsetManager;
private final ErrorHandler errorHandler;
private BridgeMetricsManager metricsManager;

public EventRuleTransfer(CirculatorContext circulatorContext, OffsetManager offsetManager,
ErrorHandler errorHandler) {
ErrorHandler errorHandler, BridgeMetricsManager metricsManager) {
this.circulatorContext = circulatorContext;
this.offsetManager = offsetManager;
this.errorHandler = errorHandler;
this.metricsManager = metricsManager;
}

@Override
Expand Down Expand Up @@ -87,20 +94,31 @@ public void run() {
afterTransformConnect.clear();
List<CompletableFuture<Void>> completableFutures = Lists.newArrayList();
for (String runnerName : eventRecordMap.keySet()) {
TargetRunnerConfig runnerConfig = circulatorContext.getRunnerConfig(runnerName);
TransformEngine<ConnectRecord> curTransformEngine = latestTransformMap.get(runnerName);
List<ConnectRecord> curEventRecords = eventRecordMap.get(runnerName);
curEventRecords.forEach(pullRecord -> {
long startTime = System.currentTimeMillis();
CompletableFuture<Void> transformFuture = CompletableFuture.supplyAsync(() -> curTransformEngine.doTransforms(pullRecord))
.exceptionally((exception) -> {
logger.error("transfer do transform event record failed,stackTrace-", exception);
//failed
metricsManager.eventRuleLatencySeconds(runnerName, runnerConfig.getAccountId(), BridgeMetricsConstant.Status.FAILED.name(), System.currentTimeMillis() - startTime);
metricsManager.eventruleFilterEventsTotal(runnerName, runnerConfig.getAccountId(),BridgeMetricsConstant.Status.FAILED.name(), 1);
errorHandler.handle(pullRecord, exception);
return null;
})
.thenAccept(pushRecord -> {
if (Objects.nonNull(pushRecord)) {
afterTransformConnect.add(pushRecord);
metricsManager.eventRuleLatencySeconds(RunnerUtil.getRunnerName(pushRecord), RunnerUtil.getAccountId(circulatorContext, pushRecord),
BridgeMetricsConstant.Status.FAILED.name(), System.currentTimeMillis() - startTime);

} else {
offsetManager.commit(pullRecord);
// success
metricsManager.eventruleFilterEventsTotal(runnerName, runnerConfig.getAccountId(),BridgeMetricsConstant.Status.SUCCESS.name(), 1);
metricsManager.eventRuleLatencySeconds(runnerName, runnerConfig.getAccountId(), BridgeMetricsConstant.Status.SUCCESS.name(),System.currentTimeMillis() - startTime);
}
});
completableFutures.add(transformFuture);
Expand All @@ -110,8 +128,11 @@ public void run() {
circulatorContext.offerTargetTaskQueue(afterTransformConnect);
logger.info("offer target task queues succeed, transforms - {}", JSON.toJSONString(afterTransformConnect));
} catch (Exception exception) {
//failed
logger.error("transfer event record failed, stackTrace-", exception);
afterTransformConnect.forEach(transferRecord -> errorHandler.handle(transferRecord, exception));
afterTransformConnect.forEach(transferRecord -> {
errorHandler.handle(transferRecord, exception);
});
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
import java.util.concurrent.ExecutorService;

import org.apache.commons.collections.MapUtils;
import org.apache.rocketmq.eventbridge.BridgeMetricsConstant;
import org.apache.rocketmq.eventbridge.BridgeMetricsManager;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.OffsetManager;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorContext;
import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread;
import org.apache.rocketmq.eventbridge.adapter.runtime.error.ErrorHandler;
import org.apache.rocketmq.eventbridge.adapter.runtime.utils.ExceptionUtil;
import org.apache.rocketmq.eventbridge.adapter.runtime.utils.RunnerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -47,12 +50,14 @@ public class EventTargetTrigger extends ServiceThread {
private final OffsetManager offsetManager;
private final ErrorHandler errorHandler;
private volatile Integer batchSize = 100;
private BridgeMetricsManager metricsManager;

public EventTargetTrigger(CirculatorContext circulatorContext, OffsetManager offsetManager,
ErrorHandler errorHandler) {
ErrorHandler errorHandler, BridgeMetricsManager metricsManager) {
this.circulatorContext = circulatorContext;
this.offsetManager = offsetManager;
this.errorHandler = errorHandler;
this.metricsManager = metricsManager;
}

@Override
Expand All @@ -69,22 +74,30 @@ public void run() {
}

for(String runnerName: targetRecordMap.keySet()){
long startTime = System.currentTimeMillis();
String accountId = RunnerUtil.getAccountId(circulatorContext,runnerName);
ExecutorService executorService = circulatorContext.getExecutorService(runnerName);
executorService.execute(() -> {
SinkTask sinkTask = circulatorContext.getPusherTaskMap().get(runnerName);
List<ConnectRecord> triggerRecords = targetRecordMap.get(runnerName);
try {
sinkTask.put(triggerRecords);
offsetManager.commit(triggerRecords);
metricsManager.countLatencyStat(System.currentTimeMillis() - startTime, BridgeMetricsConstant.EVENTRULE_TRIGGER_LATENCY, runnerName, accountId, BridgeMetricsConstant.Status.SUCCESS.name() );
} catch (Exception exception) {
logger.error(getServiceName() + " push target exception, stackTrace-", exception);
triggerRecords.forEach(triggerRecord -> errorHandler.handle(triggerRecord, exception));
triggerRecords.forEach(triggerRecord -> {
errorHandler.handle(triggerRecord, exception);
metricsManager.countLatencyStat(System.currentTimeMillis() - startTime, BridgeMetricsConstant.EVENTRULE_TRIGGER_LATENCY, runnerName, accountId, BridgeMetricsConstant.Status.FAILED.name() );

});
}
});
}
}
}


@Override
public String getServiceName() {
return EventTargetTrigger.class.getSimpleName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.rocketmq.eventbridge.adapter.runtime.boot.listener;

import io.openmessaging.connector.api.data.ConnectRecord;
import org.apache.rocketmq.eventbridge.BridgeMetricsManager;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.TargetRunnerListener;
import org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.SubscribeRunnerKeys;
import org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.TargetRunnerConfig;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.eventbridge.adapter.runtime.utils;

import io.openmessaging.connector.api.data.ConnectRecord;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorContext;
import org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.TargetRunnerConfig;
import org.apache.rocketmq.eventbridge.adapter.runtime.config.RuntimeConfigDefine;

public class RunnerUtil {

public static String getAccountId(CirculatorContext circulatorContext, ConnectRecord connectRecord) {
String runnerName = connectRecord.getExtension(RuntimeConfigDefine.RUNNER_NAME);
TargetRunnerConfig runnerConfig = circulatorContext.getRunnerConfig(runnerName);
return runnerConfig.getAccountId();
}


public static String getAccountId(CirculatorContext circulatorContext, String runnerName) {
TargetRunnerConfig runnerConfig = circulatorContext.getRunnerConfig(runnerName);
return runnerConfig.getAccountId();
}

public static String getRunnerName(ConnectRecord connectRecord) {
return connectRecord.getExtension(RuntimeConfigDefine.RUNNER_NAME);
}
}
5 changes: 2 additions & 3 deletions adapter/runtime/src/main/resources/runtime.properties
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ rocketmq.namesrvAddr=localhost:9876
rocketmq.consumer.pullTimeOut = 3000
rocketmq.consumer.pullBatchSize=20
rocketmq.cluster.name=DefaultCluster
rocketmq.consumerGroup=default
## runtime
rumtimer.name=eventbridge-runtimer
runtimer.pluginpath=/Users/Local/eventbridge/plugin
runtimer.storePathRootDir=/Users/Local/eventbridge/store
rumtime.name=eventbridge-runtimer
## listener
listener.eventQueue.threshold=50000
listener.targetQueue.threshold=50000
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.utils.NetworkUtil;
import org.apache.rocketmq.eventbridge.BridgeConfig;
import org.apache.rocketmq.eventbridge.BridgeMetricsManager;
import org.apache.rocketmq.eventbridge.adapter.runtime.boot.listener.EventSubscriber;
import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread;
import org.apache.rocketmq.eventbridge.adapter.runtime.common.entity.SubscribeRunnerKeys;
Expand Down Expand Up @@ -85,6 +88,7 @@ public class RocketMQEventSubscriber extends EventSubscriber {
private Integer pullTimeOut;
private Integer pullBatchSize;

private BridgeConfig bridgeConfig;
private ClientConfig clientConfig;
private SessionCredentials sessionCredentials;
private String socksProxy;
Expand Down
Loading