-
Notifications
You must be signed in to change notification settings - Fork 53
Open
Description
Background&Motivation
What needs to be done
- Concurrent push
- Each topic is allocated a cache queue, and messages can be pushed randomly from a topic queue or queried to push messages.
ConcurrentHashMap<String,BlockingQueue<ConnectRecord>> - Set the maximum execution time for tasks, and sinkTask beyond the allowed execution time should yield CPU time. At the same time, interrupt tasks if necessary. Put the ConnectRecord into a low-priority queue and consider discarding it after reaching the retry threshold. The consumption of consumption sites can be processed at this time.
Why do this
- Improving the push ability of messages.
- Prevent some tasks from starving, because of other task need to due the data flow too large.
- SinkTask consume ability is insufficient.
What benefits are bred
- Improved message distribution capabilities
- Message distribution per topic is not affected by traffic
- Avoid blockage caused by slow sinkTask processing messages
How to achieve it
- Adding a thread pool to
EventTargetPusherto execute sinkTask can refer to Connect'sWorkerTask'implementation - Redefine the data structures for ``eventRecord
,targetQueue`,`targetQueue`in` CirculatorContext`,Consider using`ConcurrentHashMap<String,BlockingQueue>` - Implement polling for topic message fetching and checking of task execution status in the
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.EventTargetPusher#runmethod.- When there are no messages in the high-priority queue, get the message push from the low-priority queue
// cache the committed to the thread pool task and check run state and interrupt it.
private final Map<WorkerTask, TaskFuture> taskToFutureMap = new ConcurrentHashMap<>();
public void run() {
while (!stopped) {
// 任务执行状态检查
checkSubmittedTasks();
ConnectRecord targetRecord = circulatorContext.takeTargetMap();
if (Objects.isNull(targetRecord)) {
targetRecord = circulatorContext.takeLowTargetMap();
}
if (Objects.isNull(targetRecord)) {
logger.info("current target pusher is empty");
this.waitForRunning(1000);
continue;
}
if (logger.isDebugEnabled()) {
logger.debug("start push content by pusher - {}", JSON.toJSONString(targetRecord));
}
Map<String, SinkTask> latestTaskMap = circulatorContext.getPusherTaskMap();
String runnerName = targetRecord.getExtensions().getString(RuntimerConfigDefine.RUNNER_NAME);
SinkTask sinkTask = latestTaskMap.get(runnerName);
WorkerSinkTask workerSinkTask = new WorkerSinkTask(sinkTask, targetRecord);
//
Future future = executorService.submit(workerSinkTask);
TaskFuture taskFuture = new TaskFuture(future, System.currentTimeMillis());
taskToFutureMap.put(workerSinkTask, taskFuture);
}
}checkSubmittedTasks
private void checkSubmittedTasks() {
for (Map.Entry<WorkerTask, TaskFuture> entry : taskToFutureMap.entrySet()) {
WorkerTask workerTask = entry.getKey();
TaskFuture taskFuture = entry.getValue();
Long currentTimeMillis = System.currentTimeMillis();
TaskState state = workerTask.getTaskState();
Future future = taskFuture.getFuture();
// exited normally
switch (state) {
case NEW:
// 任务提交但未被执行
break;
case STARTED:
if (currentTimeMillis - workerTask.getStartTimestamp() > circulatorContext.getMaxExecuteTimeoutMills()) {
workerTask.transitionTo(TaskState.TIMEOUT);
// 如果执行超时,中断此任务
future.cancel(true);
taskToFutureMap.remove(workerTask);
// 将ConnectRecord放入低优先级队列
circulatorContext.offerLowTargetTaskQueue(workerTask.getConnectRecord());
}
break;
case ERROR:
taskFuture.getFuture().cancel(true);
taskToFutureMap.remove(workerTask);
// put to lower queue
circulatorContext.offerLowTargetTaskQueue(workerTask.getConnectRecord());
break;
case TIMEOUT:
taskToFutureMap.remove(workerTask);
// TODO 可以考虑丢弃,或者进入死信,或者持久化到DB用于审计或者运维手动推送
break;
case COMPLETED:
taskToFutureMap.remove(workerTask);
break;
default:
logger.error("[BUG] Illegal State in when checking stopping tasks, {} is in {} state",
workerTask, state);
}
}
}Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels