From 5cf05d6bd96e87b9b6c3f9b314b689f9d303da85 Mon Sep 17 00:00:00 2001 From: xuzifu666 <1206332514@qq.com> Date: Wed, 8 Apr 2026 19:52:30 +0800 Subject: [PATCH 1/3] [AURON #2185] Integration with Flink metrics in AuronKafkaSourceFunction --- .../connector/kafka/AuronKafkaSourceFunction.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java index a9bc7c893..ba2fb466c 100644 --- a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java @@ -24,6 +24,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.flink.metrics.Counter; import org.apache.auron.flink.arrow.FlinkArrowReader; import org.apache.auron.flink.arrow.FlinkArrowUtils; import org.apache.auron.flink.configuration.FlinkAuronConfiguration; @@ -117,6 +118,7 @@ public class AuronKafkaSourceFunction extends RichParallelSourceFunction sourceContext) throws Exception { + metricGroup = getRuntimeContext().getMetricGroup(); + final Map flinkCounters = new HashMap<>(); + nativeMetric = new MetricNode(new ArrayList<>()) { @Override public void add(String name, long value) { - // TODO Integration with Flink metrics - LOG.info("Metric Auron Source: {} = {}", name, value); + // Integration with Flink metrics + Counter counter = flinkCounters.get(name); + if (counter == null) { + counter = metricGroup.counter(name); + flinkCounters.put(name, counter); + } + counter.inc(value); + LOG.debug("Metric Auron Source: {} = {}", name, value); } }; List fieldList = new LinkedList<>(); From 45a3522da6ea85939bbbc615c5c3c657b3b4ac2b Mon Sep 17 00:00:00 2001 From: xuzifu666 <1206332514@qq.com> Date: Thu, 9 Apr 2026 11:16:00 +0800 Subject: [PATCH 2/3] fix styles --- .../auron/flink/connector/kafka/AuronKafkaSourceFunction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java index ba2fb466c..7ae69eb97 100644 --- a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java @@ -286,7 +286,7 @@ public void open(Configuration config) throws Exception { public void run(SourceContext sourceContext) throws Exception { metricGroup = getRuntimeContext().getMetricGroup(); final Map flinkCounters = new HashMap<>(); - + nativeMetric = new MetricNode(new ArrayList<>()) { @Override public void add(String name, long value) { From 66d52616eb7197a1286585c4a47205aa2f01dd67 Mon Sep 17 00:00:00 2001 From: xuzifu666 <1206332514@qq.com> Date: Thu, 9 Apr 2026 11:31:48 +0800 Subject: [PATCH 3/3] fix styles --- .../auron/flink/connector/kafka/AuronKafkaSourceFunction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java index 7ae69eb97..178f611e2 100644 --- a/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java +++ b/auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/connector/kafka/AuronKafkaSourceFunction.java @@ -24,7 +24,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.flink.metrics.Counter; import org.apache.auron.flink.arrow.FlinkArrowReader; import org.apache.auron.flink.arrow.FlinkArrowUtils; import org.apache.auron.flink.configuration.FlinkAuronConfiguration; @@ -51,6 +50,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext;