From dea4c3af7e84b2d21704beaf52bce21e03de2b7e Mon Sep 17 00:00:00 2001 From: Hankunming <1109939087@qq.com> Date: Wed, 15 May 2024 20:03:27 +0800 Subject: [PATCH] fix: batch sink didn't report metrics --- .../flink/connector/rocketmq/legacy/RocketMQSink.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSink.java b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSink.java index 9f87486a..db530f13 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSink.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSink.java @@ -64,6 +64,7 @@ public class RocketMQSink extends RichSinkFunction implements Checkpoin private boolean batchFlushOnCheckpoint; // false by default private int batchSize = 32; private List batchList; + private long batchMessageSize; private Meter sinkInTps; private Meter outTps; @@ -112,6 +113,7 @@ public void invoke(Message input, Context context) throws Exception { if (batchFlushOnCheckpoint) { batchList.add(input); + batchMessageSize += input.getBody().length; if (batchList.size() >= batchSize) { flushSync(); } @@ -220,8 +222,13 @@ private void flushSync() throws Exception { if (batchFlushOnCheckpoint) { synchronized (batchList) { if (batchList.size() > 0) { + long startSinkTime = System.currentTimeMillis(); producer.send(batchList); + latencyGauge.report(System.currentTimeMillis() - startSinkTime, 1); + outTps.markEvent(); + outBps.markEvent(batchMessageSize); batchList.clear(); + batchMessageSize = 0; } } }